Twitter Heron: Stream Processing at Scale on Sigmod 2015.

Heron is the newly announced Twitter Stream Processing Engine. It has already used in production for about half a year. One year ago Twitter Released their Stream Processing Engine Storm@twitter. And now, it is the end of Twitter Storm History.

Motivating Heron and Criticizing Storm

Multiple levels of Scheduling in Storm

Storm has a very complexity multiple levels of scheduling, which leads to uncertainty about when tasks are being scheduled.

Several instances of worker processes are scheduled by the operating system in a host. Inside the JVM process, each executor is mapped to two threads. In turn, these threads are scheduled using a preemptive and priority-based scheduling algorithm by the JVM. Since each thread has to run

several tasks, the executor implements another scheduling algorithm to invoke the appropriate task, based on the incoming data. Such multiple levels of scheduling and their complex interaction often leads to uncertainty about when the tasks are being scheduled.

Mixture of tasks in workers complicate the Recovery in Storm

In Storm, each worker can run disparate task. It is not possible to isolate the resource usage of tasks, such as error logs. Since logs from multiple tasks are written into a single file, it is hard to identify any errors or exceptions that are associated with a particular task.

Homogeneous assumption of workers Wast Resources

Storm assumes that every worker is homogeneous. This results in inefficient utilization of allocated resources, and often results in over-provisioning. This problem gets worse with increasing number of diverse components being packed into a worker.

Dumps take so long that heartbeats are missed

Because of the over-provisioning and large amount of memory allocation, when a worker is executing a heap dump and long garbage collection it misses sending heartbeats signals, which makes the supervisor kills the process preventing the dump from completing.

Limit the Degree of parallelism

One possible solution to reduce the resource wast is to run one task per worker. However, this cloud not lead to inefficiently in resource usage and limit the degree of parallelism that we could achieve.

Long communication chain leads to significant overhead

A global receive thread in each worker process is responsible for getting data from workers “upstream”, and a global send thread is in charge of transmitting data to the workers “downstream”. In addition to these global threads, each executor consists of a user logic thread that runs the topology code, and a local send thread that communicates the output data from the user logic thread to the global sender thread. Hence in Storm, each tuple has to pass through four threads from the point of entry to the point of exit inside the worker process 2 . This design leads to significant overhead and queue contention issues.

Functionally overloaded Nimbus becomes an operational bottleneck

The Storm Nimbus [20] performs several functions including scheduling, monitoring, and distributing JARs. It also serves the metrics-reporting component for the system, and manages counters for several topologies.

Nimbus becomes an operational bottleneck because:

  1. The Nimbus scheduler does not support resource reservation and isolation. Storm workers belonging to different topologies but running on the same machine can interfere with each other.
  2. At very large numbers, Zookeeper becomes the bottleneck.
  3. When the Nimbus fails, the users are neither able to submit any new topologies nor kill existing ones. Furthermore, when Nimbus fails, any existing topology that undergoes failures cannot be automatically detected and recovered.

Disadvantages of fail-fast mechanism

If the receiver component is unable to handle incoming data/tuples, then the sender simply drops tuples.

The fail-fast mechanism is used to drop the long tail tasks in approximations. But it has the following disadvantages:

  1. If acknowledgements are disabled, this mechanism will result in unbounded tuple drops, making it hard to get visibility about these drops.
  2. Work done by upstream components is lost.
  3. System behavior becomes less predictable.

Unpredictable performance during topology execution

  1. A tuple failure anywhere in the tuple tree leads to failure of the entire tuple tree.
  2. Topologies consuming large amount of RAM for a worker encounter garbage collection (GC) cycles greater than a minute, resulting in high latencies and high tuple failure rates.
  3. In some cases, there is a lot of contention at the transfer queues, especially when a worker runs several executors.

The Heron Solution

heron

Heron runs topologies. A topology is a directed acyclic graph of spouts and bolts. Like Storm, spouts generate the input tuples that are fed into the topology, and bolts do the actual computation.

Like Spark Tasks,

A Heron topology is equivalent to a logical query plan in a database system. Such a logical plan is translated into a physical plan before actual execution.

Topology Master

Topology Master is similar to the Application Master in YARN. TM is not involved in the data processing path, it is not a bottleneck.

Stream Manager

The key function of the Stream Manager (SM) is to manage the routing of tuples efficiently. Each Heron Instance (HI) connects to its local SM to send and receive tuples. All the SMs in a topology connect between themselves to form a O(k^2) connection network, where k is the number of containers/SMs in the physical plan of the topology.

Stream Manager use backpressure mechanism instead of fail-fast mechanism. If the later/downstream stages are running slow, then the earlier/upstream stages will be slow down by the backpressure mechanism. The backpressure mechanism exist in many component, such as TCP, Spout and Stage scheduler.

Heron Instance

The HIs have two threads namely, a Gateway thread and a Task Execution thread. The Gateway thread is responsible for controlling all the communication and data movement in and out from the HI. The Task Execution thread runs user code.

How Heron Address the problems?

  1. The provisioning of resources is abstracted from the duties of the cluster manger.
  2. Each HI is executing only a single task.
  3. The design makes it transparent as to which component of the topology is failing or slowing down.
  4. Heron allows component-level resource allocation, thereby avoiding unnecessary over-provisioning.
  5. Having a Topology Manager per topology enables them to be managed independently.
  6. The backpressure mechanism gives a consistent rate of delivering results and a way to reason about the system. It is also a key mechanism that supports migration of topologies from one set of containers to another.
  7. No single point of failure.

Summary

So let’s rethink the problem of Storm. One of the author (Github) saids that the Storm has better resource usage for small scale stream processing, while the Heron is designed for large scale.

The major design difference of Storm and Heron is the level of resource isolation and the number of globally shared resources.

In Storm, topologies share the Nimbus, thus unable to track the resource usage of each topology. This design is better for the homogeneous tasks (which is the basic assumption of Storm). Nimbus can be the bottle neck of large scale processing.

In Heron, there is no single point of bottle neck. And because of the component-level resource allocation, we can track/debug each component and do more optimizing for performance, scability, stability. Backpressure mechanism is one of the optimizing example.