## Thursday, January 20, 2011

### Graph Processing With Wukong and Hadoop

As a last (for now) tutorial oriented post on Wukong, let's process a network graph.

## Get Data

This airport data (airport edges) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.

## Explore Data

We've got to actually look at the data before we can make any decisions about how to process it and what questions we'd like answered:

$: head data/flights_with_colnames.tsv | wu-lign origin_airport destin_airport passengers flights month MHK AMW 21 1 200810EUG RDM 41 22 199011EUG RDM 88 19 199012EUG RDM 11 4 199010MFR RDM 0 1 199002MFR RDM 11 1 199003MFR RDM 2 4 199001MFR RDM 7 1 199009MFR RDM 7 2 199011 So it's exactly what you'd expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics... ## Ask A Question A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph: 1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the 'in degree' and passengers out is (naturally) called the 'out degree'. 2. Flights Degree: For a given airport the number of flights in + the number of flights out. Let's write the question wukong style: #!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class EdgeMapper < Wukong::Streamer::RecordStreamer # # Yield both ways so we can sum (passengers in + passengers out) and (flights # in + flights out) individually in the reduce phase. # def process origin_code, destin_code, passengers, flights, month yield [origin_code, month, "OUT", passengers, flights] yield [destin_code, month, "IN", passengers, flights] endendclass DegreeCalculator < Wukong::Streamer::AccumulatingReducer # # What are we going to use as a key internally? # def get_key airport, month, in_or_out, passengers, flights [airport, month] end def start! airport, month, in_or_out, passengers, flights @out_degree = {:passengers => 0, :flights => 0} @in_degree = {:passengers => 0, :flights => 0} end def accumulate airport, month, in_or_out, passengers, flights case in_or_out when "IN" then @in_degree[:passengers] += passengers.to_i @in_degree[:flights] += flights.to_i when "OUT" then @out_degree[:passengers] += passengers.to_i @out_degree[:flights] += flights.to_i end end # # For every airport and month, calculate passenger and flight degrees # def finalize # Passenger degrees (out, in, and total) passengers_out = @out_degree[:passengers] passengers_in = @in_degree[:passengers] passengers_total = passengers_in + passengers_out # Flight degrees (out, in, and total) flights_out = @out_degree[:flights] flights_in = @in_degree[:flights] flights_total = flights_in + flights_out yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total] endend## Need to use 2 fields for partition so every record with the same airport and# month land on the same reducer#Wukong::Script.new( EdgeMapper, DegreeCalculator, :partition_fields => 2 # use two fields to partition records ).run Don't panic. There's a lot going on in this script so here's the breakdown (real gentle like): ### Mapper Here we're using wukong's RecordStreamer class which reads lines from$stdin and splits on tabs for us already. That's how we know exactly what arguments the process method gets.

Next, as is often the case with low level map-reduce, we've got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information ("OUT" or "IN") depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.

Finally, we've carefully rearranged our records such that (airport,month) is always the first two fields. We'll partition on this as the key. (We have to say that explicitly at the bottom of the script)

### Reducer

We've seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we've explicitly told it to use the airport and month as the key with the 'get_key' method.

* start! - Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let's all take a moment and think about how awful and unreadable that would be in java...)

* accumulate - Here we simply look at each record and decide which counters to increment depending on whether it's "OUT" or "IN".

* finalize - All we're doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the 'key' is just (airport,month).

## Get An Answer

We know how to put the data on the hdfs and run the script by now so we'll skip that part. Here's what the output looks like:

\$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign 1B1 200906 1   1   2 1  1  2ABE 200705 0  83  83 0  3  3ABE 199206 0  31  31 0  1  1ABE 200708 0 904 904 0 20 20ABE 200307 0  91  91 0  2  2ABE 200703 0  36  36 0  1  1ABE 199902 0  84  84 0  1  1ABE 200611 0 753 753 0 18 18ABE 199209 0  99  99 0  1  1ABE 200702 0  54  54 0  1  1ABE 200407 0  98  98 0  1  1ABE 200705 0 647 647 0 15 15ABE 200306 0  27  27 0  1  1ABE 200703 0 473 473 0 11 11ABE 200309 0 150 150 0  1  1ABE 200702 0 313 313 0  8  8ABE 200103 0   0   0 0  1  1ABE 199807 0 105 105 0  1  1ABE 199907 0  91  91 0  1  1ABE 199501 0  50  50 0  1  1

At this point is where you might bring this back down to your local file system, crack open a program like R, make some plots, etc.

And we're done for now. Hurray.

1. How easy would it be to port this to Python? Also, can you post what code you'd use in R to plot this?

Cheers

2. I'd imagine you could port this to python pretty easily though I'm not familiar enough with Python and its streaming map-reduce library (dumbo) to know for sure. The gotcha here is partitioning with 2 fields and not the default 1 field.

As far as plotting in R I'd go with ggplot2: http://had.co.nz/ggplot2/. One thing you might try (for fun) is rewriting the wukong code above to roll up by year, summing the degrees over months. That way you'll end up with 20 different years ('views' or 'facets') to consider which can easily be handled with ggplot's 'facet_wrap' function.

I plan on revisiting this particular problem later next week with Pig. I'll post a plot and the code then.

--thedatachef

3. Nice article very happy to see this Hadoop Training Article.. I came to know hadoop training in hyderabad.in at hyderabad is also providing excellent hadoop training.. keep Posting more articles..