Friday, January 21, 2011

Pig, Bringing Simplicity to Hadoop

In strong contrast to the seat-of-your-pants style of Wukong there is another high level language for Hadoop called Pig. See Apache Pig.


At the top level, here's what Pig gets you:

  • No java required. That is, use as little (zero) or as much (reams) of java code as you want.

  • No boilerplate code

  • Intuitive and easy to understand language (similar to SQL) with clean uniform syntax

  • Separation of high level algorithm and low level map-reduce jobs

  • Build your analysis as a set of operations acting on data

  • Most algorithms are less than 5, human readable, lines of Pig

Get Pig

Go here and download pig (version 0.8) from somewhere close to you. Unpack it and put it wherever you like. Then, type 'pig' at the command line and see what happens. It's very likely that it doesn't pick up your existing Hadoop configuration. To fix that set HADOOP_HOME to point to your hadoop installation and PIG_CLASSPATH to point to your hadoop configuration (probably /etc/hadoop/conf). Here's all that again:

$: wget
$: tar -zxf pig-0.8.0.tar.gz
$: sudo mv pig-0.8.0 /usr/local/share/
$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig
$: sudo ln -s /usr/local/share/pig/bin/pig /usr/local/bin/pig
$: hash -r
$: export HADOOP_HOME=/usr/lib/hadoop
$: export PIG_CLASSPATH=/etc/hadoop/conf
$: pig
2011-01-21 09:56:32,486 [main] INFO org.apache.pig.Main - Logging error messages to: /home/jacob/pig_1295625392480.log
2011-01-21 09:56:32,879 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master:8020
2011-01-21 09:56:33,402 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: master:54311

See how easy that was. Get it over with now.


People post the most interesting data on Infochimps. Let's get the first billion digits of pi here. Notice that it's arranged in groups of 10 digits with 10 groups per line. We're going to use pig to see how the digits are distributed.

This will allow us to visually (once we make a plot) spot any obvious deviation from a random series of numbers. That is, for a random series of numbers we'd expect each digit (0-9) to appear equally often. If this isn't true then we know we're dealing with a more complicated beast.


After catting the data file you'll notice the end of the line has the crufty details attached that makes this data more or less impossible to load into Pig as a table. Pig is terrible at data munging/parsing/cleaning. Thankfully we have wukong. Let's write a dead simple wukong script to fix the last field and create one digit per line:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class PiCleaner < Wukong::Streamer::LineStreamer
def process line
fields = line.strip.split(' ', 10)
hundred_digit_string = [fields[0..8], fields[9][0..9]].join rescue ""
hundred_digit_string.each_char{|digit| yield digit}
end, nil).run

Let's go ahead and run that right away (this will take a few minutes depending on your rig, it'll be a billion lines of output...):

$: hdp-mkdir /data/math/pi/
$: hdp-put pi-010.txt /data/math/pi/
$: ./pi_clean.rb --run /data/math/pi/pi-010.txt /data/math/pi/first_billion_digits.tsv
I, [2011-01-22T11:01:57.363704 #12489] INFO -- : Launching hadoop!
I, [2011-01-22T11:01:57.363964 #12489] INFO -- : Running

/usr/local/share/hadoop/bin/hadoop \
jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.reduce.tasks=0 \
-D'pi_clean.rb---/data/math/pi/pi-010.txt---/data/math/pi/first_billion_digits.tsv' \
-mapper '/usr/bin/ruby1.8 pi_clean.rb --map ' \
-reducer '' \
-input '/data/math/pi/pi-010.txt' \
-output '/data/math/pi/first_billion_digits.tsv' \
-file '/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb' \
-cmdenv 'RUBYLIB=~/.rubylib'

11/01/22 11:01:59 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/22 11:01:59 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]
11/01/22 11:01:59 INFO streaming.StreamJob: Running job: job_201012031305_0251
11/01/22 11:01:59 INFO streaming.StreamJob: To kill this job, run:
11/01/22 11:01:59 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_0251
11/01/22 11:01:59 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0251
11/01/22 11:02:00 INFO streaming.StreamJob: map 0% reduce 0%
11/01/22 11:05:11 INFO streaming.StreamJob: map 100% reduce 100%
11/01/22 11:05:11 INFO streaming.StreamJob: Job complete: job_201012031305_0251
11/01/22 11:05:11 INFO streaming.StreamJob: Output: /data/math/pi/first_billion_digits.tsv
packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar4401930660028806042/] [] /tmp/streamjob3153669001520547.jar tmpDir=null

Great. Now let's write some Pig:


This is what Pig is awesome at. Remember that accumulating reducer in wukong? We're about to do the identical thing in two lines of no-nonsense pig:

-- load
digits = LOAD '$PI_DIGITS' AS (digit:int);

groups = GROUP digits BY digit;
counts = FOREACH groups GENERATE group AS digit, COUNT(digits) AS num_digits;

-- store
STORE counts INTO '$OUT';

All we're doing here is reading in the data (one digit per line), accumulating all the digits with the same 'digit', and counting them up. Save it into a file called 'pi.pig' and run with the following:

$: pig -p PI_DIGITS=/data/math/pi/first_billion_digits.tsv -p OUT=/data/math/pi/digit_counts.tsv pi.pig

I'll skip the output since it's rather verbose. Now we can make a simple plot by hdp-catting our output data into a local file (it's super tiny by now) and plotting it with R:

$: hdp-catd /data/math/pi/digit_counts.tsv > digit_counts.tsv
$: R
> library(ggplot2)
> digit_counts <- read.table('digit_counts.tsv', header=FALSE, sep="\t")
> names(digit_counts) <- c('digit', 'count')
> p <- ggplot(digit_counts, aes(x=digit)) + geom_histogram(aes(y = ..density.., weight = count, binwidth=1), colour='black', fill='grey', alpha=0.7)
> p + scale_y_continuous(limits=c(0,0.5))

Will result in the following:

I'll leave it to you to make your own assumptions.