GraphX: Graph Processing in a Distributed Dataflow Framework on OSDI 2014.
Graph processing systems, such as Pregel and PowerGraph, typically outperform general-purpose distributed dataflow frameworks like Hadoop MapReduce by orders of magnitude. However, it is still required to process large graph in general-purpose distributed dataflow frameworks.
GraphX achieves an order of magnitude performance gain over the base dataflow framework and matches the performance of specialized graph processing systems while enabling a wider range of computation.
Pregel Graph-Parallel Abstraction
The pregel express the graph algorithm as a Pregel vertex program. This program interface enable concurrently execution of vertexes. The PageRank example is expressed as follow:
def PageRank(v: Id, msgs: List[Double]) {
// Compute the message sum
var msgSum = 0
for (m <- msgs) { msgSum += m }
// Update the PageRank
PR(v) = 0.15 + 0.85 * msgSum
// Broadcast messages with new PR
for (j <- OutNbrs(v)) {
msg = PR(v) / NumLinks(v)
send_msg(to=j, msg)
}
// Check for termination
if (converged(PR(v))) voteToHalt(v)
}
The vertex program for the vertex v begins by summing the messages encoding the weighted PageRank of neighboring vertices. The PageRank is updated using the resulting sum and is then broadcast to its neighbors (weighted by the number of links). Finally, the vertex program assesses whether it has converged (locally) and votes to halt.
Graph System Optimizations
The restrictions imposed by the graph-parallel abstraction along with the sparse graph structure enable a range of important system optimizations.
The GAS Decomposition
The GAS decomposition splits vertex programs into three data-parallel stages: Gather, Apply, and Scatter.
The GAS decomposition leads to a pull-based model of message computation, enables vertex-cut partitioning, improved work balance, serial edge-iteration and reduced data movement, while prohibits direct communication between vertices.
Graph Partitioning
Graph-partitioning algorithms minimize communication and balance computation. The vertex-cut partitioning performs well on many large natural graphs.
Mirror Vertices
Mirror Vertices is used to combine messages send to the same remote machine.
Active Vertices
This optimization track active vertices and eliminate data movement and unnecessary computation for vertices that have converged.
Graph Computation as Dataflow Ops.
- In the join stage, vertex and edge properties are joined to form the triplets view 1 consisting of each edge and its corresponding source and destination vertex properties.
- In the group-by stage, the triplets are grouped by source or destination vertex to construct the neighborhood of each vertex and compute aggregates.
These two stages capture the GAS decomposition. On top of triplets, we can also implement the Pregel abstraction by iteratively composing the join and group-by stages with data-parallel map stages.
Distributed Graph Representation
The vertex collection is hash-partitioned by the vertex ids. To support frequent joins across vertex collections, vertices are stored in a local hash index within each partition.
The edge collection is horizontally partitioned by a user-defined partition function. GraphX enables vertex-cut partitioning, which minimizes communication in natural graphs such as social networks and web graphs.