Hadoop Online Prototype
Hadoop Online Prototype is a modified version of the Hadoop MapReduce framework that supports
online aggregation, which allows users to see “early returns” from a job as it is being computed and was previously in development at UC-Berkeley. (No source release has been published since 2010 and the Google Code site has had very little activity in the last two years).
Hadoop Online Prototype (HOP) also supports continuous queries, which enable MapReduce programs to be written for applications such as event monitoring and stream processing. HOP retains the fault tolerance properties of Hadoop, and can run unmodified user-defined MapReduce programs.
HOP Video
This video was a presentation given by the development team to the Hadoop Bay Area Users Group in March 2010. The lighting in the video is poor and it's almost impossible to read the slides. Thankfully, they are posted here so you can follow along with them pulled up in a seperate window (I highly recommend doing this if you plan to watch the video).
HOP Architecture
Pipelining results provides several important advantages:
- A downstream dataflow element can begin consuming data before a producer element has finished execution, which can increase opportunities for parallelism, improve utilization, and reduce response time.
- Since reducers begin processing data as soon as it is produced by mappers, they can generate and refine an approximation of their final answer during the course of execution. This technique, known as online aggregation [6], can reduce the turnaround time for data analysis by several orders of magnitude. (This was demonstrated with a naive algorithm in the video above and no updated release has been published to improve support).
- Pipelining widens the domain of problems to which MapReduce can be applied. In the HOP demonstration paper, the developers describe a system monitoring tool written as a MapReduce job that leverages HOP’s support of continuous queries: MapReduce jobs that run continuously, accepting new data as it arrives and analyzing it immediately.
There are two main drawbacks to implementing a pipeline. First, MapReduce achieves fault tolerance by materializing intermediate state (map output). HOP addresses this challenge by allowing nodes to materialize partial chunks of intermediate state. The second drawback is that pipelining significantly reduces the benefits obtained through use of combiners, since partial state is forwarded to the reducers, more network bandwidth is used, and the reducer ends up peforming more work than it would in normal MapReduce.
The initial pipeline design had an eager structure that forwarded every record seperately, but this shifted all sorting to the reducer, completely precluded use of the combiner and caused map tasks to routinely block on network I/O.
The revised pipeline model splits each task into two threads (task and spill). The task thread applies map/reduce function and buffers output; the spill thread sorts & combines the buffer and "spills" to a file. The Hadoop TaskTracker code is modified to facilitate sending spill files to the consumer.
The basic algorithm for the pipeline is to halt when spill files back up or an effective combiner is designated by the user. The pipeline resumes by first merging all spill files into one (this recovers some of the combiner capabilities of regular MapReduce).
As mentioned previously, breaking apart Map tasks causes signficant fault tolerance challenges by eliminating clear demarkation points. The initial Pipeline Fault Tolerance model takes a simple, but costly approach for handling this. Reducers treat all map data as tentative until the map task is 100% complete, if a map task fails, the reducer throws away all the data it received from that map. To improver the performance of this fault tolerance model spill files were given deterministic boundaries and are assigned a unique sequence number. This ensures that spill files are applied in an idempotent manner and prevents redundant spill files from being sent. The main limitation is that this precludes HOP from processing non-deterministic jobs.
An additional architecture modification of interest is the inclusion of Monitoring system that utilizes the new pipeline/streaming extensions to allow Hadoop to effectively monitor itself. The intent of this monitor is to allow dynamic scheduler changes (triggers that might halt pipelining or reassign tasks from a hot spot). Continuous jobs require reducers to periodically run on available map output. The timeframe for these checks is set by the application. Additionally, In the currently available implementation, the number of map and reduce tasks is fixed, and must be configured by the user. As noted by the developers< manual configuration is errorprone, and many stream processing applications have “burst” traffic patterns, in which peak load far exceeds average load. They noted in their NSDI paper that they would add some more dynamic features in the future, but no new release is out as of yet.
The last major change to Hadoop's architecture made by HOP is support for pipelining between jobs. This required a retrofit of the JobTracker. In the normal framework, dependent jobs can't execute until the jobs providing their input are complete. To address this the team made the job submission interface accept a list of dependent jobs that could then be co-scheduled. Preference for execution of tasks during contention is always given to the upstreams jobs.The main drawback of this approach is that a job fault could lead to extensive wasted resources on the cluster. This doesn't matter much if a HOP workload is running in isolation, but a chain of dependent jobs holding resources could be a significant contention issue on a highly utilized cluster.
Performance

The early results from Online Aggregation can be suprisingly accurate for the top k words problem. The above results are from running a Top-100 query against 5.5GB of Wikipedia article text. The cluster utlized was 60 high-cpu medium nodes on EC2. The vertical lines represent the time when the top-k words in the early snapshot match those in the final result.
It's important to note that the current alpha release has very limited support for approximating final answers from the snapshots. A mechanism exists to perform the approximation, but users are left to design their own sampling algorithms.


This sorting experiment ran on a 60 node EC2 cluster, with the same 5.5GB Wikipedia input file and utilized 40 map tasks and 59 reduce tasks. Allowing pipelined output to the reducer cuts the execution time from 927 to 610 seconds. The stair stepping of the reducer in pipelining can be attributed to the fact that some sorting work is shifted to the reducers in HOP.

This second set of performance graphs are for for a 100GB wordcount job using 3120 map tasks and 60 reduce tasks (32MB block size). The total job runtimes were 42 minutes for blocking and 34 minutes for pipelining. In this case, pipelining does make some improvement, but not the dramatic results of the large block tests. These results intuitively make sense since pipelining essentially shrinks block sizes. The remaining performance gains are a result of the ability of HOP to start reducer work with partial input from the maps and the reduced disk I/O overhead gained by having a parallel spill thread.

The vertical bar in this graph indicates the time at which the HOP monitoring tool fired an alert about the thrashing node causing all of the page swaps. This detection time is much faster that the standard TaskTracker/JobTracker heartbeat cycle for detecting stragglers. This performance result shows that using such alerts for early straggler detection could have significant performance benefits.
Running HOP
Thankfully, unlike Puma and Spark Streaming, the HOP source code is currently available. First, download the latest source code using Subversion:
svn checkout http://hop.googlecode.com/svn/trunk/ hop-read-only
Then you can easily run the code locally. If you'd like to test the code on Amazon EC2, first follow the EC2 Getting Started Guide to setup your system (You can run EC2 from Windows, but all the HOP scripts are meant for Linux).
Then navigate to ../hop-read-only/src/contrib/ec2 and follow the README.txt guide and these setup instructions from Apache. NOTE: HOP is based on Hadoop 0.19.0 so don't download the latest distribution as the guide says HOP already has it's own modified jars built on Hadoop 0.19.0
If you are running Ubuntu you can use this nice and detailed setup guide for EC2 that is on the main Ubuntu Help site.
From there make sure you edit ../hop-read-only/src/contrib/ec2/bin/hadoop-ec2-env.sh and include your own AWS account information. (Make sure your system is secure since your AWS keys and info are all in plain-text in these scripts.
Don't forget to terminate your instances when your work is done!
A reall simple test is to run some of the hadoop sample jars for HOP and then do the same for a comparable EMR instance and compare performance.
In this example I calculated Pi in both equally sized small instance clusters.


Even at this much smaller scale the pipelining has a significant effect. On average HOP took about 57 seconds for this computation while EMR (Hadoop 1.0.3) took 68 seconds in the same sized instance.
What Next?
As I mentioned at the beginning of this article, HOP seems to almost be in an abandonware state. The Google Code site is up and running, but there has been almost no activity in the last two years and the main contributers to the project seem to have moved on to other tasks. Niel Conway is still at UC-Berkeley, but his recent publications show a shift away from Hadoop Online in his research and a quick Google search for Tyson Condie reveals that he is now working at Microsoft. Yahoo developers had expressed interested in the HOP code and apparently were even using it internally for various projects, but nothing new has materialized so far. Most of the blogs listed below seem to expect that the pipelining and continuous process features will make their way into the main Hadoop distribution in some form, but at this point it seems to be a low priority or being integrated in the background without any fanfare.
References
- HOP Video Presentation YouTube
- HOP NSDI'10 Paper and Slides: html
- Condie Presentation Slides: pdf
- Map Reduce Online: ppt
- Ubuntu EC2 Setup Guide: html
- Fast, Parallel Stream Clustering using Hadoop Online: pdf
- HOP Google Code Site: html
- It's Time to Kill the Elephant: html
- Pipelining and Real-time Analytics with MapReduce Online: html
- MapReduce Online! (and some gimmes): html
- Modeling the Performance of Hadoop Online Prototype: pdf
