## Wednesday, January 26, 2011

### Graph Processing With Apache Pig

So, you're probably sick of seeing this airport data set by now (flight edges) but it's so awesome that I have to re-use it. Let's use Pig to do the same calculation as this post in a much more succinct way. We'll really get a feel for what Pig is better at than Wukong.

## Degree Distribution

Since the point here is to illustrate the difference between the wukong way and the pig way we're not going to introduce anything clever here. Here's the code for calculating the degree by month (both passengers and flights) for every domestic airport since 1990:

---- Caculates the monthly degree distributions for domestic airports from 1990 to 2009.---- Load data (boring part)flight_edges = LOAD '$FLIGHT_EDGES' AS (origin_code:chararray, destin_code:chararray, passengers:int, flights:int, month:int);-- For every (airport,month) pair get passengers, seats, and flights outedges_out = FOREACH flight_edges GENERATE origin_code AS airport, month AS month, passengers AS passengers_out, flights AS flights_out ;-- For every (airport,month) pair get passengers, seats, and flights inedges_in = FOREACH flight_edges GENERATE destin_code AS airport, month AS month, passengers AS passengers_in, flights AS flights_in ;-- group them together and sumgrouped_edges = COGROUP edges_in BY (airport,month), edges_out BY (airport,month);degree_dist = FOREACH grouped_edges { passenger_degree = SUM(edges_in.passengers_in) + SUM(edges_out.passengers_out); flights_degree = SUM(edges_in.flights_in) + SUM(edges_out.flights_out); GENERATE FLATTEN(group) AS (airport, month), passenger_degree AS passenger_degree, flights_degree AS flights_degree ; };STORE degree_dist INTO '$DEG_DIST';

So, here's what's going on:

• FOREACH..GENERATE: this is called a 'projection' in pig. Here we're really just cutting out the fields we don't want and rearranging our records. This is exactly the same as what we do in the wukong script, where we yielded two different types of records for the same input data in the map phase, only a lot more clear.

• COGROUP: here we're simply joining our two data relations together (edges in and edges out) by a common key (airport code,month) and aggregating the values for that key. This is exactly the same as what we do in the 'accumulate' part of the wukong script.

• FOREACH..GENERATE (once more): here we run through our grouped records and sum the flights and passengers. This is exactly the same as the 'finalize' part of the wukong script.

So, basically, we've done in 4 lines (not counting the LOAD and STORE or the prettification) of very clear and concise code what took us ~70 lines of ruby. Win.

Here's the wukong one again for reference:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class EdgeMapper < Wukong::Streamer::RecordStreamer  #  # Yield both ways so we can sum (passengers in + passengers out) and (flights  # in + flights out) individually in the reduce phase.  #  def process origin_code, destin_code, passengers, flights, month    yield [origin_code, month, "OUT", passengers, flights]    yield [destin_code, month, "IN", passengers, flights]  endendclass DegreeCalculator < Wukong::Streamer::AccumulatingReducer  #  # What are we going to use as a key internally?  #  def get_key airport, month, in_or_out, passengers, flights    [airport, month]  end  def start! airport, month, in_or_out, passengers, flights    @out_degree = {:passengers => 0, :flights => 0}    @in_degree  = {:passengers => 0, :flights => 0}  end  def accumulate airport, month, in_or_out, passengers, flights    case in_or_out    when "IN" then      @in_degree[:passengers] += passengers.to_i      @in_degree[:flights]    += flights.to_i    when "OUT" then      @out_degree[:passengers] += passengers.to_i      @out_degree[:flights]    += flights.to_i    end  end  #  # For every airport and month, calculate passenger and flight degrees  #  def finalize    # Passenger degrees (out, in, and total)    passengers_out   = @out_degree[:passengers]    passengers_in    = @in_degree[:passengers]    passengers_total = passengers_in + passengers_out    # Flight degrees (out, in, and total)    flights_out      = @out_degree[:flights]    flights_in       = @in_degree[:flights]    flights_total    = flights_in + flights_out    yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]  endend## Need to use 2 fields for partition so every record with the same airport and# month land on the same reducer#Wukong::Script.new(  EdgeMapper,  DegreeCalculator,  :partition_fields  => 2 # use two fields to partition records  ).run

## Plot Data

A very common workflow pattern with Hadoop is to use a tool like pig or wukong to process large scale data and generate some result data set. The last step in this process is (rather obviously) to summarize that data in some way. Here's a quick plot (using R and ggplot2) of the flights degree distribution after I further summarized by year:

That's funny...

Finally, here's the code for the plot:

# include the ggplot2 library for nice plotslibrary(ggplot2);# Read data in and take a subsetdegrees        <- read.table('yearly_degrees.tsv', header=FALSE, sep='\t', colClasses=c('character', 'character', 'numeric', 'numeric'));names(degrees) <- c('airport_code', 'year', 'passenger_degree', 'flights_degree');select_degrees <- subset(degrees, year=='2000' | year=='2001' | year=='2002' | year=='2009' | year=='1990');# Plotting with ggplot2pdf('passenger_degrees.pdf', 12, 6, pointsize=10);ggplot(select_degrees, aes(x=passenger_degree, fill=year)) + geom_density(colour='black', alpha=0.3) + scale_x_log10() + ylab('Probability') + xlab(expression(log[10] ('Passengers in + Passengers out'))) + opts(title='Passenger Degree Distribution')

Hurray.