mathjax

Showing posts with label ruby. Show all posts
Showing posts with label ruby. Show all posts

Saturday, January 22, 2011

JRuby and Hadoop, Notes From a Non-Java Programmer

So I spent a fair deal of time playing with JRuby this weekend. Here's my notes/conclusions so far. Disclaimer: My experience with java overall is limited, so most of this might be obvious to a java ninja but maybe not to a ruby one...

Goal:

The whole point here is that it sure would be nice to only write a few lines of ruby code into a file called something like 'rb_mapreduce.rb' and run it by saying: ./rb_mapreduce.rb. No compiling, no awkward syntax. Pure ruby, plain and simple.

Experiments/Notes:

I created a 'WukongPlusMapper' that subclassed 'org.apache.hadoop.mapreduce.Mapper' and implemented the important methods, namely 'map'. Then I setup and launched a job from inside jruby using this jruby mapper ('WukongPlusMapper') as the map class.

The job launched and ran just fine. But...

Problems and Lessons Learned:

  • It is possible (in fact extremely easy) to setup and launch a Hadoop job with pure jruby

  • It is not possible, that I can tell so far, to use an uncompiled jruby class as either the mapper or the reducer for a Hadoop job. It doesn't throw an error (so long as you've subclassed a proper java mapper) but actually just uses the superclass's definition instead. I believe the reason is that each map task must have access to the full class definition for its mapper (only sensible) and has no idea what to do with my transient 'WukongPlusMapper' class. Obviously the same would apply to the reducer

  • It is possible to compile a jruby class ahead-of-time, stuff it into the job jar, and then launch the job with ordinary means. There are a couple somewhat obvious drawbacks with this method:

    • You've got to specify 'java_signatures' for each of your methods that are going to be called inside java

    • Messy logic+code for compiling thyself, stuffing thyself into a jar, shipping thyself with MR job. Might as well just write java at this point. radoop has some logic for doing that pretty well laid out.

  • It is possible to define and create an object in jruby that subclasses a java class or implements a java interface. Then you can simply overwrite the methods you want to overwrite. It's possible to pass instances of this class to a java runtime that only knows about the superclass and the subclass's methods (at least the ones that have the signatures defined in the superclass) will work just fine. Unfortunately, (and plainly obvious in hindsight) this does NOT work with Hadoop since these instances all show up in java as 'proxy classes' and are only accessible to the launching jvm

  • On another note there is the option of using the scripting engine which, as far as I can tell, is what both jruby-on-hadoop and radoop are using. Something of concern though is that neither of these two projects seem to have much traction. However, it may be that the scripting engine is the only way to reasonably make this work, at least 2 people vote yes ...




So, implementation complexity aside, it looks like all one would have to do is come up with some way of making JRuby's in-memory class definitions available to all of the spawned mappers and reducers. Probably not something I want to delve into at the moment.

Script engine it is.

Thursday, January 20, 2011

Graph Processing With Wukong and Hadoop

As a last (for now) tutorial oriented post on Wukong, let's process a network graph.

Get Data



This airport data (airport edges) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.

Explore Data



We've got to actually look at the data before we can make any decisions about how to process it and what questions we'd like answered:


$: head data/flights_with_colnames.tsv | wu-lign
origin_airport destin_airport passengers flights month
MHK AMW 21 1 200810
EUG RDM 41 22 199011
EUG RDM 88 19 199012
EUG RDM 11 4 199010
MFR RDM 0 1 199002
MFR RDM 11 1 199003
MFR RDM 2 4 199001
MFR RDM 7 1 199009
MFR RDM 7 2 199011


So it's exactly what you'd expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics...

Ask A Question


A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph:

1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the 'in degree' and passengers out is (naturally) called the 'out degree'.

2. Flights Degree: For a given airport the number of flights in + the number of flights out.

Let's write the question wukong style:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class EdgeMapper < Wukong::Streamer::RecordStreamer
#
# Yield both ways so we can sum (passengers in + passengers out) and (flights
# in + flights out) individually in the reduce phase.
#
def process origin_code, destin_code, passengers, flights, month
yield [origin_code, month, "OUT", passengers, flights]
yield [destin_code, month, "IN", passengers, flights]
end
end

class DegreeCalculator < Wukong::Streamer::AccumulatingReducer
#
# What are we going to use as a key internally?
#
def get_key airport, month, in_or_out, passengers, flights
[airport, month]
end

def start! airport, month, in_or_out, passengers, flights
@out_degree = {:passengers => 0, :flights => 0}
@in_degree = {:passengers => 0, :flights => 0}
end

def accumulate airport, month, in_or_out, passengers, flights
case in_or_out
when "IN" then
@in_degree[:passengers] += passengers.to_i
@in_degree[:flights] += flights.to_i
when "OUT" then
@out_degree[:passengers] += passengers.to_i
@out_degree[:flights] += flights.to_i
end
end

#
# For every airport and month, calculate passenger and flight degrees
#
def finalize

# Passenger degrees (out, in, and total)
passengers_out = @out_degree[:passengers]
passengers_in = @in_degree[:passengers]
passengers_total = passengers_in + passengers_out

# Flight degrees (out, in, and total)
flights_out = @out_degree[:flights]
flights_in = @in_degree[:flights]
flights_total = flights_in + flights_out

yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]
end
end

#
# Need to use 2 fields for partition so every record with the same airport and
# month land on the same reducer
#
Wukong::Script.new(
EdgeMapper,
DegreeCalculator,
:partition_fields => 2 # use two fields to partition records
).run


Don't panic. There's a lot going on in this script so here's the breakdown (real gentle like):

Mapper


Here we're using wukong's RecordStreamer class which reads lines from $stdin and splits on tabs for us already. That's how we know exactly what arguments the process method gets.

Next, as is often the case with low level map-reduce, we've got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information ("OUT" or "IN") depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.

Finally, we've carefully rearranged our records such that (airport,month) is always the first two fields. We'll partition on this as the key. (We have to say that explicitly at the bottom of the script)

Reducer


We've seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we've explicitly told it to use the airport and month as the key with the 'get_key' method.

* start! - Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let's all take a moment and think about how awful and unreadable that would be in java...)

* accumulate - Here we simply look at each record and decide which counters to increment depending on whether it's "OUT" or "IN".

* finalize - All we're doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the 'key' is just (airport,month).

Get An Answer


We know how to put the data on the hdfs and run the script by now so we'll skip that part. Here's what the output looks like:


$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign
1B1 200906 1 1 2 1 1 2
ABE 200705 0 83 83 0 3 3
ABE 199206 0 31 31 0 1 1
ABE 200708 0 904 904 0 20 20
ABE 200307 0 91 91 0 2 2
ABE 200703 0 36 36 0 1 1
ABE 199902 0 84 84 0 1 1
ABE 200611 0 753 753 0 18 18
ABE 199209 0 99 99 0 1 1
ABE 200702 0 54 54 0 1 1
ABE 200407 0 98 98 0 1 1
ABE 200705 0 647 647 0 15 15
ABE 200306 0 27 27 0 1 1
ABE 200703 0 473 473 0 11 11
ABE 200309 0 150 150 0 1 1
ABE 200702 0 313 313 0 8 8
ABE 200103 0 0 0 0 1 1
ABE 199807 0 105 105 0 1 1
ABE 199907 0 91 91 0 1 1
ABE 199501 0 50 50 0 1 1


At this point is where you might bring this back down to your local file system, crack open a program like R, make some plots, etc.

And we're done for now. Hurray.

Monday, January 17, 2011

Processing XML Records with Hadoop and Wukong

Another common pattern that Wukong addresses exceedingly well is liberating data from unwieldy formats (XML) into tsv. For example, lets consider the following Hacker News dataset: See RedMonk Analytics

A single record looks like this:


<row><ID>33</ID><ParentID>31</ParentID><Text>&lt;font color="#5a5a5a"&gt;winnar winnar chicken dinnar!&lt;/font&gt;</Text><Username>spez</Username><Points>0</Points><Type>2</Type><Timestamp>2006-10-10T21:11:18.093</Timestamp><CommentCount>0</CommentCount></row>


And here's a wukong example script that turns that into tsv:


#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'
require 'wukong/encoding'
require 'crack'

class HackernewsComment < Struct.new(:username, :url, :title, :text, :timestamp, :comment_id, :points, :comment_count, :type)
def self.parse raw
raw_hash = Crack::XML.parse(raw.strip)
return unless raw_hash
return unless raw_hash["row"]
raw_hash = raw_hash["row"]
raw_hash[:username] = raw_hash["Username"].wukong_encode if raw_hash["Username"]
raw_hash[:url] = raw_hash["Url"].wukong_encode if raw_hash["Url"]
raw_hash[:title] = raw_hash["Title"].wukong_encode if raw_hash["Title"]
raw_hash[:text] = raw_hash["Text"].wukong_encode if raw_hash["Text"]
raw_hash[:feed_id] = raw_hash["ID"].to_i if raw_hash["ID"]
raw_hash[:points] = raw_hash["Points"].to_i if raw_hash["Points"]
raw_hash[:comment_count] = raw_hash["CommentCount"].to_i if raw_hash["CommentCount"]
raw_hash[:type] = raw_hash["Type"].to_i if raw_hash["Type"]

# Eg. Map '2010-10-26T19:29:59.717' to easier to work with '20101027002959'
raw_hash[:timestamp] = Time.parse_and_flatten(raw_hash["Timestamp"]) if raw_hash["Timestamp"]
#
self.from_hash(raw_hash, true)
end
end

class XMLParser < Wukong::Streamer::LineStreamer
def process line
return unless line =~ /^\<row/
yield HackernewsComment.parse(line)
end
end

Wukong::Script.new(XMLParser, nil).run


Here's how it works. We're going to use the "StreamXmlRecordReader" for hadoop streaming. What this does is give the map task one row per map. That's our line variable. Additionally, we've defined a data model to read the row into called "HackernewsComment". This guy is responsible for parsing the xml record and creating a new instance of itself.

Inside the HackernewsComment's parse method we create clean fields that we'd like to use. Wukong has a method for strings called 'wukong_encode' which simply xml encodes the text so weird characters aren't an issue. You can imagine modifying the raw fields in other ways to construct and fill the fields of your output data model.

Finally, a new instance of HackernewsComment is created using the clean fields and emitted. Notice that we don't have to do anything special to the new comment once it's created. That's because Wukong will do the "right thing" and serialize out the class name as a flat field (hackernews_comment) along with the fields, in order, as a tsv record.

Save this into a file called "process_xml.rb" and run with the following:


$:./process_xml.rb --split_on_xml_tag=row --run /tmp/hn-sample.xml /tmp/xml_out
I, [2011-01-17T11:09:17.461643 #5519] INFO -- : Launching hadoop!
I, [2011-01-17T11:09:17.461757 #5519] INFO -- : Running

/usr/local/share/hadoop/bin/hadoop \
jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.reduce.tasks=0 \
-D mapred.job.name='process_xml.rb---/tmp/hn-sample.xml---/tmp/xml_out' \
-inputreader 'StreamXmlRecordReader,begin=<row>,end=</row>' \
-mapper '/usr/bin/ruby1.8 process_xml.rb --map ' \
-reducer '' \
-input '/tmp/hn-sample.xml' \
-output '/tmp/xml_out' \
-file '/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb' \
-cmdenv 'RUBYLIB=$HOME/.rubylib'

11/01/17 11:09:18 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/17 11:09:19 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]
11/01/17 11:09:19 INFO streaming.StreamJob: Running job: job_201012031305_0243
11/01/17 11:09:19 INFO streaming.StreamJob: To kill this job, run:
11/01/17 11:09:19 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_0243
11/01/17 11:09:19 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0243
11/01/17 11:09:20 INFO streaming.StreamJob: map 0% reduce 0%
11/01/17 11:09:34 INFO streaming.StreamJob: map 100% reduce 0%
11/01/17 11:09:40 INFO streaming.StreamJob: map 87% reduce 0%
11/01/17 11:09:43 INFO streaming.StreamJob: map 87% reduce 100%
11/01/17 11:09:43 INFO streaming.StreamJob: Job complete: job_201012031305_0243
11/01/17 11:09:43 INFO streaming.StreamJob: Output: /tmp/xml_out
packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar902611811523431467/] [] /tmp/streamjob681918437315823836.jar tmpDir=null


Finally, let's take a look at our new, happily liberated, tsv records:


$: hdp-catd /tmp/xml_out | head | wu-lign
hackernews_comment Harj http://blog.harjtaggar.com YC Founder looking for Rails Tutor 20101027002959 0 5 0 1
hackernews_comment pg http://ycombinator.com Y Combinator 20061010003558 1 39 15 1
hackernews_comment phyllis http://www.paulgraham.com/mit.html A Student's Guide to Startups 20061010003648 2 12 0 1
hackernews_comment phyllis http://www.foundersatwork.com/stevewozniak.html Woz Interview: the early days of Apple 20061010183848 3 7 0 1
hackernews_comment onebeerdave http://avc.blogs.com/a_vc/2006/10/the_nyc_develop.html NYC Developer Dilemma 20061010184037 4 6 0 1
hackernews_comment perler http://www.techcrunch.com/2006/10/09/google-youtube-sign-more-separate-deals/ Google, YouTube acquisition announcement could come tonight 20061010184105 5 6 0 1
hackernews_comment perler http://360techblog.com/2006/10/02/business-intelligence-the-inkling-way/ Business Intelligence the Inkling Way: cool prediction markets software 20061010185246 6 5 0 1
hackernews_comment phyllis http://featured.gigaom.com/2006/10/09/sevin-rosen-unfunds-why/ Sevin Rosen Unfunds - why? 20061010021030 7 5 0 1
hackernews_comment frobnicate http://news.bbc.co.uk/2/hi/programmes/click_online/5412216.stm LikeBetter featured by BBC 20061010021033 8 10 0 1
hackernews_comment askjigga http://www.weekendr.com/ weekendr: social network for the weekend 20061010021036 9 3 0 1



Hurray.


As a side note, I strongly encourage comments. Seriously. How am I supposed to know what's useful for you and what isn't unless you comment?

Sunday, January 16, 2011

Processing JSON Records With Hadoop and Wukong

For another illustration of how Wukong is making it way simpler to work with data, let's process some real JSON records.

Get Data


Download this awesome UFO data at Infochimps. It's over 60,000 documented ufo sightings with text descriptions. Best of all, it's available in tsv, json, and avro formats. Downloading the bz2 package will get you all three.

Explore Data


Once you've got your data set lets crack open the json version and take a look at it:


# weirdness in infochimps packaging (a zipped bz2?)
$: unzip icsdata-d60000-documented-ufo-sightings-with-text-descriptions-and-metad_20101020143604-bz2.zip
Archive: icsdata-d60000-documented-ufo-sightings-with-text-descriptions-and-metad_20101020143604-bz2.zip
creating: chimps_16154-2010-10-20_14-33-35/
inflating: chimps_16154-2010-10-20_14-33-35/README-infochimps
inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv
inflating: chimps_16154-2010-10-20_14-33-35/16154.yaml
inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.avro
inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.json
$: head chimps_16154-2010-10-20_14-33-35/ufo_awesome.json
{"sighted_at": "19951009", "reported_at": "19951009", "location": " Iowa City, IA", "shape": "", "duration": "", "description": "Man repts. witnessing "flash, followed by a classic UFO, w/ a tailfin at back." Red color on top half of tailfin. Became triangular."}
{"sighted_at": "19951010", "reported_at": "19951011", "location": " Milwaukee, WI", "shape": "", "duration": "2 min.", "description": "Man on Hwy 43 SW of Milwaukee sees large, bright blue light streak by his car, descend, turn, cross road ahead, strobe. Bizarre!"}
{"sighted_at": "19950101", "reported_at": "19950103", "location": " Shelton, WA", "shape": "", "duration": "", "description": "Telephoned Report:CA woman visiting daughter witness discs and triangular ships over Squaxin Island in Puget Sound. Dramatic. Written report, with illustrations, submitted to NUFORC."}
{"sighted_at": "19950510", "reported_at": "19950510", "location": " Columbia, MO", "shape": "", "duration": "2 min.", "description": "Man repts. son's bizarre sighting of small humanoid creature in back yard. Reptd. in Acteon Journal, St. Louis UFO newsletter."}
{"sighted_at": "19950611", "reported_at": "19950614", "location": " Seattle, WA", "shape": "", "duration": "", "description": "Anonymous caller repts. sighting 4 ufo's in NNE sky, 45 deg. above horizon. (No other facts reptd. No return tel. #.)"}
{"sighted_at": "19951025", "reported_at": "19951024", "location": " Brunswick County, ND", "shape": "", "duration": "30 min.", "description": "Sheriff's office calls to rept. that deputy, 20 mi. SSE of Wilmington, is looking at peculiar, bright white, strobing light."}
{"sighted_at": "19950420", "reported_at": "19950419", "location": " Fargo, ND", "shape": "", "duration": "2 min.", "description": "Female student w/ friend witness huge red light in sky. 2 others witness. Obj pulsated, started to flicker. Winked out."}
{"sighted_at": "19950911", "reported_at": "19950911", "location": " Las Vegas, NV", "shape": "", "duration": "", "description": "Man repts. bright, multi-colored obj. in NW night sky. Disappeared while he was in house."}
{"sighted_at": "19950115", "reported_at": "19950214", "location": " Morton, WA", "shape": "", "duration": "", "description": "Woman reports 2 craft fly over house. Strange events taking place in town w/ paramilitary activities."}
{"sighted_at": "19950915", "reported_at": "19950915", "location": " Redmond, WA", "shape": "", "duration": "6 min.", "description": "Young man w/ 2 co-workers witness tiny, distinctly white round disc drifting slowly toward NE. Flew in dir. 90 deg. to winds."}


looks pretty interesting. As one last (obvious to some, sure) simple check lets see how big it is:


$: ls -lh chimps_16154-2010-10-20_14-33-35
total 220M
-rw-r--r-- 1 jacob 3.4K 2010-10-20 09:34 16154.yaml
-rw-r--r-- 1 jacob 908 2010-10-20 09:34 README-infochimps
-rw------- 1 jacob 72M 2010-10-20 09:33 ufo_awesome.avro
-rw------- 1 jacob 77M 2010-10-20 09:33 ufo_awesome.json
-rw------- 1 jacob 72M 2010-10-20 09:33 ufo_awesome.tsv


Load Data


Now, 77M is small enough that you COULD process on a single machine with methods you already know. However, this example is about hadoop so let's go ahead and throw it on the hadoop distributed file system (HDFS) so we can process it in parallel:

hdp-mkdir /data/domestic/ufo
hdp-put chimps_16154-2010-10-20_14-33-35/ufo_awesome.json /data/domestic/ufo/


(I'm going to have to assume you already have a HDFS up an running, see this for a simple how-to.)
If all goes well you should see your file there

hdp-ls /data/domestic/ufo/
Found 1 items
-rw-r--r-- 1 jacob supergroup 80346460 2011-01-16 21:21 /data/domestic/ufo/ufo_awesome.json


Process Data



Let's write a really simple wukong script to find the most popular ufo shapes:


#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'
require 'json'

class JSONMapper < Wukong::Streamer::LineStreamer
def process record
sighting = JSON.parse(record) rescue {}
return unless sighting["shape"]
yield sighting["shape"] unless sighting["shape"].empty?
end
end

class ShapeReducer < Wukong::Streamer::AccumulatingReducer

def start! shape
@count = 0
end

def accumulate shape
@count += 1
end

def finalize
yield [key, @count]
end

end

Wukong::Script.new(JSONMapper, ShapeReducer).run


Mapper


Our mapper class, JSONMapper, is nearly identical to the earlier post. All it does is read in single json records from $stdin, parse out the "shape" field (with some rescues for handling data nastiness), and emit the "shape" field back to $stdout.

Reducer


The reducer, ShapeReducer, is about the simplest reducer in Wukong that still illustrates the major points:

* start! - the first method that is called on a new group of data. A group is, if you remember your map-reduce, all records with the same key. In this case it's all the "shape" fields with the same shape. All this method does is decide how to initialize any internal state a reducer has. In this case we simply initialize a counter to 0.

* accumulate - even simpler. This method operates on each record in the group. In our simple case we just increment the internal counter by 1.

* finalize - the final step in the reduce. We've processed all our records and so we yield the key corresponding to our group (that's just going to be the unique "shape") and the count so far.

And that's it. Let's save it into a file called "process_ufo.rb" and run it locally on 10000 lines:


$: cat chimps_16154-2010-10-20_14-33-35/ufo_awesome.json| head -n10000| ./process_ufo.rb --map | sort | ./process_ufo.rb --reduce | sort -nk2 | wu-lign
changed 1
dome 1
flare 1
hexagon 1
pyramid 1
crescent 2
round 2
delta 8
cross 25
cone 41
teardrop 79
rectangle 112
egg 113
chevron 128
diamond 137
flash 138
cylinder 155
changing 204
cigar 255
formation 290
oval 333
unknown 491
sphere 529
circle 667
other 721
disk 727
fireball 799
triangle 868
light 1760


Notice when we run this locally we have to stick the "sort" program in there. This is to simulate what hadoop gives us for free. It looks like light is going to come out ahead. Let's see what happens when we run it with hadoop:


$: ./process_ufo.rb --run /data/domestic/ufo/ufo_awesome.json /data/domestic/ufo/shape_counts
I, [2011-01-16T21:51:43.431534 #11447] INFO -- : Launching hadoop!
I, [2011-01-16T21:51:43.476626 #11447] INFO -- : Running

/usr/local/share/hadoop/bin/hadoop \
jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.job.name='process_ufo.rb---/data/domestic/ufo/ufo_awesome.json---/data/domestic/ufo/shape_counts' \
-mapper '/usr/bin/ruby1.8 process_ufo.rb --map ' \
-reducer '/usr/bin/ruby1.8 /home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb --reduce ' \
-input '/data/domestic/ufo/ufo_awesome.json' \
-output '/data/domestic/ufo/shape_counts' \
-file '/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb' \
-cmdenv 'RUBYLIB=$HOME/.rubylib'

11/01/16 21:51:45 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/16 21:51:46 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]
11/01/16 21:51:46 INFO streaming.StreamJob: Running job: job_201012031305_0221
11/01/16 21:51:46 INFO streaming.StreamJob: To kill this job, run:
11/01/16 21:51:46 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_0221
11/01/16 21:51:46 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0221
11/01/16 21:51:47 INFO streaming.StreamJob: map 0% reduce 0%
11/01/16 21:51:59 INFO streaming.StreamJob: map 100% reduce 0%
11/01/16 21:52:12 INFO streaming.StreamJob: map 100% reduce 100%
11/01/16 21:52:15 INFO streaming.StreamJob: Job complete: job_201012031305_0221
11/01/16 21:52:15 INFO streaming.StreamJob: Output: /data/domestic/ufo/shape_counts
packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar3466191386581838257/] [] /tmp/streamjob3112551829711880856.jar tmpDir=null


As one final step let's cat the output data and take a look at it:


hdp-catd /data/domestic/ufo/shape_counts | sort -nk2 | wu-lign
changed 1
dome 1
flare 1
hexagon 1
pyramid 1
crescent 2
round 2
delta 8
cross 177
cone 265
teardrop 592
egg 661
chevron 757
diamond 909
rectangle 957
cylinder 980
flash 988
changing 1532
cigar 1774
formation 1774
oval 2859
fireball 3436
sphere 3613
unknown 4458
other 4570
disk 4794
circle 5249
triangle 6036
light 12138


Pretty simple?

Saturday, January 15, 2011

Swineherd

Swineherd is a useful tool for combining together multiple pig scripts, wukong scripts, and even R scripts into a workflow managed by rake. Here though I'd like to save the actual workflow part for a later post and just illustrate it's uniform interface to these scripts by showing how to launch a wukong script:


#!/usr/bin/env ruby

require 'rake'
require 'swineherd'

task :wukong_job do
script = WukongScript.new('/path/to/wukong_script')
script.options = {:some_option => "123", :another_option => "foobar"}
script.input << '/path/to/input'
script.output << '/path/to/output'
script.run
end



You can save this into a file called "Rakefile" and run it by saying:
rake wukong_job

Wukong's Hadoop Convenience Utilities

Wukong comes with a number of convenience command-line utilities for working with the hdfs as well as a few commands for basic hadoop streaming. All of them can be found in wukong's bin directory. Enumerating a few:

* hdp-put
* hdp-ls
* hdp-mkdir
* hdp-rm
* hdp-catd
* hdp-stream
* hdp-steam-flat
* and more...

HDFS utilities



These are just wrappers around the hadoop fs utility to cut down on the amount of typing:
Hadoop fs utilityWukong Convenience Command
hadoop fs -puthdp-put
hadoop fs -lshdp-ls
hadoop fs -mkdirhdp-mkdir
hadoop fs -rm hdp-rm


hdp-catd


hdp-catd will take an arbitrary hdfs directory and cat it's contents. It ignores those files that start with a "_" character. This means we can cat a whole directory of those awful part-xxxxx files.

hdp-stream



hdp-stream allows you to run a generic streaming task without all the typing. You almost always only need to specify input, output, num keys for partition, num sort key fields, number of reducers, and what scripts to use as the mapper and reducer. Here's an example of running a uniq:


hdp-stream /path/to/input /path/to/output /bin/cat /usr/bin/uniq 2 3 -Dmapred.reduce.tasks=10


will launch a streaming job using 2 fields for partition and 3 fields for sort keys and 10 reduce tasks. See http://hadoop.apache.org/common/docs/r0.20.0/mapred-default.html for other options you can pass in with the "-D" flag.

hdp-stream-flat



There's one other extremely useful case when you don't care to specify anything about partitioners because you either aren't running a reduce or don't care how your data is sent to individual reducers. In this case hdp-stream-flat is very useful. Here's how cut off the first two fields of a large input file:


hdp-stream-flat /path/to/input /path/to/output "/usr/bin/cut -f1,2" "/bin/cat" -Dmapred.reduce.tasks=0


see wukong/bin for more useful command line utilities.

Wukong, Bringing Ruby to Hadoop

Wukong is hands down the simplest (and probably the most fun) tool to use with hadoop. It especially excels at the following use case:

You've got a huge amount of data (let that be whatever size you think is huge). You want to perform a simple operation on each record. For example, parsing out fields with a regular expression, adding two fields together, stuffing those records into a data store, etc etc. These are called map only jobs. They do NOT require a reduce. Can you imagine writing a java map reduce program to add two fields together? Wukong gives you all the power of ruby backed by all the power (and parallelism) of hadoop streaming. Before we get into examples, and there will be plenty, let's make sure you've got wukong installed and running locally.

Installing Wukong



First and foremost you've got to have ruby installed and running on your machine. Most of the time you already have it. Try checking the version in a terminal:

$: ruby --version
ruby 1.8.7 (2010-01-10 patchlevel 249) [x86_64-linux]


If that fails then I bet google can help you get ruby installed on whatever os you happen to be using.

Next is to make sure you've got rubygems installed

$: gem --version
1.3.7


Once again, google can help you get it installed if you don't have it.

Wukong is a rubygem so we can just install it that way:

sudo gem install wukong
sudo gem install json
sudo gem install configliere


Notice we also installed a couple of other libraries to help us out (the json gem, the configliere gem, and the extlib gem). If at any time you get weird errors (LoadError: no such file to load -- somelibraryname) then you probably just need to gem install somelibraryname.

An example


Moving on. You should be ready to test out running wukong locally now. Here's the most minimal working wukong script I can come up with that illustrates a map only wukong script:


#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class LineMapper < Wukong::Streamer::LineStreamer
def process line
yield line
end
end

Wukong::Script.new(LineMapper, nil).run



Save that into a file called wukong_test.rb and run it with the following:
cat wukong_test.rb | ./wukong_test.rb --map


If everything works as expected then you should see exactly the contents of your script dump onto your terminal. Lets examine what's actually going on here.

Boiler plate ruby


First, we're letting the interpreter know we want to use ruby with the first line (somewhat obvious). Next, we're including the libraries we need.

The guts


Then we define a class in ruby for doing our map job called LineMapper. This guy subclasses from the wukong LineStreamer class. All the LineStreamer class does is simply read records from stdin and gives them as arguments to the LineMapper's process method. The process method then does nothing more than yield the line back to the LineStreamer which emits the line back to stdout.

The runner


Finally, we have to let wukong know we intend to run our script. We create a new script object with LineMapper as the mapper class and nil as the reducer class.

More succinctly, we've written our own cat program. When we ran the above command we simply streamed our script, line by line, through the program. Try streaming some real data through the program and adding some more stuff to the process method. Perhaps parsing the line with a regular expression and yielding numbers? Yielding words? Yielding characters? The choice is yours. Have fun with it.

Meatier examples to come.

Thursday, January 13, 2011

Convert TSV to JSON command line

So you've got some tsv data:


$: head foo.tsv
148 0.05 49.0530378784848
380 0.8 85.0345986160553
496 0.05 33.665653373865
612 0.15 58.1366745330187
728 0.1 60.8615655373785
844 0.3 69.4102235910563
960 0.2 74.4530218791248
1076 0.2 76.6129354825807
1192 2.25 99.0050397984081
1888 0.5 53.7328506859725


and you've got some field names (field_1,field_2,field_3). Try this:


$: export FIELDS=field_1,field_2,field_3
$: cat foo.tsv| ruby -rjson -ne 'puts ENV["FIELDS"].split(",").zip($_.strip.split("\t")).inject({}){|h,x| h[x[0]]=x[1];h}.to_json'


will give you something that looks like:


{"field_1":"148","field_2":"0.05","field_3":"49.0530378784848"}
{"field_1":"380","field_2":"0.8","field_3":"85.0345986160553"}
{"field_1":"496","field_2":"0.05","field_3":"33.665653373865"}
{"field_1":"612","field_2":"0.15","field_3":"58.1366745330187"}
{"field_1":"728","field_2":"0.1","field_3":"60.8615655373785"}
{"field_1":"844","field_2":"0.3","field_3":"69.4102235910563"}
{"field_1":"960","field_2":"0.2","field_3":"74.4530218791248"}
{"field_1":"1076","field_2":"0.2","field_3":"76.6129354825807"}
{"field_1":"1192","field_2":"2.25","field_3":"99.0050397984081"}
{"field_1":"1888","field_2":"0.5","field_3":"53.7328506859725"}


Hurray.