Monday, January 17, 2011

Processing XML Records with Hadoop and Wukong

Another common pattern that Wukong addresses exceedingly well is liberating data from unwieldy formats (XML) into tsv. For example, lets consider the following Hacker News dataset: See RedMonk Analytics

A single record looks like this:

<row><ID>33</ID><ParentID>31</ParentID><Text>&lt;font color="#5a5a5a"&gt;winnar winnar chicken dinnar!&lt;/font&gt;</Text><Username>spez</Username><Points>0</Points><Type>2</Type><Timestamp>2006-10-10T21:11:18.093</Timestamp><CommentCount>0</CommentCount></row>

And here's a wukong example script that turns that into tsv:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'
require 'wukong/encoding'
require 'crack'

class HackernewsComment <, :url, :title, :text, :timestamp, :comment_id, :points, :comment_count, :type)
def self.parse raw
raw_hash = Crack::XML.parse(raw.strip)
return unless raw_hash
return unless raw_hash["row"]
raw_hash = raw_hash["row"]
raw_hash[:username] = raw_hash["Username"].wukong_encode if raw_hash["Username"]
raw_hash[:url] = raw_hash["Url"].wukong_encode if raw_hash["Url"]
raw_hash[:title] = raw_hash["Title"].wukong_encode if raw_hash["Title"]
raw_hash[:text] = raw_hash["Text"].wukong_encode if raw_hash["Text"]
raw_hash[:feed_id] = raw_hash["ID"].to_i if raw_hash["ID"]
raw_hash[:points] = raw_hash["Points"].to_i if raw_hash["Points"]
raw_hash[:comment_count] = raw_hash["CommentCount"].to_i if raw_hash["CommentCount"]
raw_hash[:type] = raw_hash["Type"].to_i if raw_hash["Type"]

# Eg. Map '2010-10-26T19:29:59.717' to easier to work with '20101027002959'
raw_hash[:timestamp] = Time.parse_and_flatten(raw_hash["Timestamp"]) if raw_hash["Timestamp"]
self.from_hash(raw_hash, true)

class XMLParser < Wukong::Streamer::LineStreamer
def process line
return unless line =~ /^\<row/
yield HackernewsComment.parse(line)
end, nil).run

Here's how it works. We're going to use the "StreamXmlRecordReader" for hadoop streaming. What this does is give the map task one row per map. That's our line variable. Additionally, we've defined a data model to read the row into called "HackernewsComment". This guy is responsible for parsing the xml record and creating a new instance of itself.

Inside the HackernewsComment's parse method we create clean fields that we'd like to use. Wukong has a method for strings called 'wukong_encode' which simply xml encodes the text so weird characters aren't an issue. You can imagine modifying the raw fields in other ways to construct and fill the fields of your output data model.

Finally, a new instance of HackernewsComment is created using the clean fields and emitted. Notice that we don't have to do anything special to the new comment once it's created. That's because Wukong will do the "right thing" and serialize out the class name as a flat field (hackernews_comment) along with the fields, in order, as a tsv record.

Save this into a file called "process_xml.rb" and run with the following:

$:./process_xml.rb --split_on_xml_tag=row --run /tmp/hn-sample.xml /tmp/xml_out
I, [2011-01-17T11:09:17.461643 #5519] INFO -- : Launching hadoop!
I, [2011-01-17T11:09:17.461757 #5519] INFO -- : Running

/usr/local/share/hadoop/bin/hadoop \
jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.reduce.tasks=0 \
-D'process_xml.rb---/tmp/hn-sample.xml---/tmp/xml_out' \
-inputreader 'StreamXmlRecordReader,begin=<row>,end=</row>' \
-mapper '/usr/bin/ruby1.8 process_xml.rb --map ' \
-reducer '' \
-input '/tmp/hn-sample.xml' \
-output '/tmp/xml_out' \
-file '/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb' \
-cmdenv 'RUBYLIB=$HOME/.rubylib'

11/01/17 11:09:18 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/17 11:09:19 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]
11/01/17 11:09:19 INFO streaming.StreamJob: Running job: job_201012031305_0243
11/01/17 11:09:19 INFO streaming.StreamJob: To kill this job, run:
11/01/17 11:09:19 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_0243
11/01/17 11:09:19 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0243
11/01/17 11:09:20 INFO streaming.StreamJob: map 0% reduce 0%
11/01/17 11:09:34 INFO streaming.StreamJob: map 100% reduce 0%
11/01/17 11:09:40 INFO streaming.StreamJob: map 87% reduce 0%
11/01/17 11:09:43 INFO streaming.StreamJob: map 87% reduce 100%
11/01/17 11:09:43 INFO streaming.StreamJob: Job complete: job_201012031305_0243
11/01/17 11:09:43 INFO streaming.StreamJob: Output: /tmp/xml_out
packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar902611811523431467/] [] /tmp/streamjob681918437315823836.jar tmpDir=null

Finally, let's take a look at our new, happily liberated, tsv records:

$: hdp-catd /tmp/xml_out | head | wu-lign
hackernews_comment Harj YC Founder looking for Rails Tutor 20101027002959 0 5 0 1
hackernews_comment pg Y Combinator 20061010003558 1 39 15 1
hackernews_comment phyllis A Student's Guide to Startups 20061010003648 2 12 0 1
hackernews_comment phyllis Woz Interview: the early days of Apple 20061010183848 3 7 0 1
hackernews_comment onebeerdave NYC Developer Dilemma 20061010184037 4 6 0 1
hackernews_comment perler Google, YouTube acquisition announcement could come tonight 20061010184105 5 6 0 1
hackernews_comment perler Business Intelligence the Inkling Way: cool prediction markets software 20061010185246 6 5 0 1
hackernews_comment phyllis Sevin Rosen Unfunds - why? 20061010021030 7 5 0 1
hackernews_comment frobnicate LikeBetter featured by BBC 20061010021033 8 10 0 1
hackernews_comment askjigga weekendr: social network for the weekend 20061010021036 9 3 0 1


As a side note, I strongly encourage comments. Seriously. How am I supposed to know what's useful for you and what isn't unless you comment?


  1. Hello,

    I ran across this while trying to process some xml through the streaming API, and I have a few questions.

    The first is about Wukong in general: I have been using the MapReduce Toolkit for my ruby/stream processing jobs, and I was wondering if you are familiar with it and if so what the advantage of using Wukong in its place is.

    Second, specifically to XML processing, your example shows a simple case in which the XML record is neatly contained on a single line. In order to process data like Wikipedia where the page is not all on one line, for Mr. Toolkit I need to build a line-by-line buffer until I reach the end tag and then output the buffer as a record for the reducers, etc. Would you do the same thing in Wukong or is there a better way of tracking a multi-line record?


    Jeremy Bensley

  2. For your first question, no I haven't used MR toolkit so it's hard to make a comparison. I've been using Wukong for a long time, it's very simple, it gets the job done, and I haven't needed to look elsewhere.

    For the second, the example record posted doesn't have newlines but the data, in general, does. Hadoop streaming's "StreamXmlRecordReader" will take care of this though. Instead of splitting on newlines it gives your map task everything between 'begin' and 'end'. This way you don't have to manage a buffer yourself.

    Wukong allows you to use this functionality by passing in the '--split_on_xml_tag=[my_tag]' when launching the script.


  3. Yeah, I am using the StreamXmlRecordReader but it appears that as an artifact of the way MR Toolkit assumes per-line input users have to maintain that buffer themselves.

    I had wondered why it had received very few updates in the past couple of years, apparently everyone else moved to Wukong for their ruby hadoop streaming needs and forgot to tell me.

    Thanks for the info, looks like I've got a few scripts to convert over to Wukong.


  4. Gaining Python certifications will validate your skills and advance your career.
    python certification