mathjax

Sunday, January 16, 2011

Processing JSON Records With Hadoop and Wukong

For another illustration of how Wukong is making it way simpler to work with data, let's process some real JSON records.

Get Data


Download this awesome UFO data at Infochimps. It's over 60,000 documented ufo sightings with text descriptions. Best of all, it's available in tsv, json, and avro formats. Downloading the bz2 package will get you all three.

Explore Data


Once you've got your data set lets crack open the json version and take a look at it:


# weirdness in infochimps packaging (a zipped bz2?)
$: unzip icsdata-d60000-documented-ufo-sightings-with-text-descriptions-and-metad_20101020143604-bz2.zip
Archive: icsdata-d60000-documented-ufo-sightings-with-text-descriptions-and-metad_20101020143604-bz2.zip
creating: chimps_16154-2010-10-20_14-33-35/
inflating: chimps_16154-2010-10-20_14-33-35/README-infochimps
inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv
inflating: chimps_16154-2010-10-20_14-33-35/16154.yaml
inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.avro
inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.json
$: head chimps_16154-2010-10-20_14-33-35/ufo_awesome.json
{"sighted_at": "19951009", "reported_at": "19951009", "location": " Iowa City, IA", "shape": "", "duration": "", "description": "Man repts. witnessing "flash, followed by a classic UFO, w/ a tailfin at back." Red color on top half of tailfin. Became triangular."}
{"sighted_at": "19951010", "reported_at": "19951011", "location": " Milwaukee, WI", "shape": "", "duration": "2 min.", "description": "Man on Hwy 43 SW of Milwaukee sees large, bright blue light streak by his car, descend, turn, cross road ahead, strobe. Bizarre!"}
{"sighted_at": "19950101", "reported_at": "19950103", "location": " Shelton, WA", "shape": "", "duration": "", "description": "Telephoned Report:CA woman visiting daughter witness discs and triangular ships over Squaxin Island in Puget Sound. Dramatic. Written report, with illustrations, submitted to NUFORC."}
{"sighted_at": "19950510", "reported_at": "19950510", "location": " Columbia, MO", "shape": "", "duration": "2 min.", "description": "Man repts. son's bizarre sighting of small humanoid creature in back yard. Reptd. in Acteon Journal, St. Louis UFO newsletter."}
{"sighted_at": "19950611", "reported_at": "19950614", "location": " Seattle, WA", "shape": "", "duration": "", "description": "Anonymous caller repts. sighting 4 ufo's in NNE sky, 45 deg. above horizon. (No other facts reptd. No return tel. #.)"}
{"sighted_at": "19951025", "reported_at": "19951024", "location": " Brunswick County, ND", "shape": "", "duration": "30 min.", "description": "Sheriff's office calls to rept. that deputy, 20 mi. SSE of Wilmington, is looking at peculiar, bright white, strobing light."}
{"sighted_at": "19950420", "reported_at": "19950419", "location": " Fargo, ND", "shape": "", "duration": "2 min.", "description": "Female student w/ friend witness huge red light in sky. 2 others witness. Obj pulsated, started to flicker. Winked out."}
{"sighted_at": "19950911", "reported_at": "19950911", "location": " Las Vegas, NV", "shape": "", "duration": "", "description": "Man repts. bright, multi-colored obj. in NW night sky. Disappeared while he was in house."}
{"sighted_at": "19950115", "reported_at": "19950214", "location": " Morton, WA", "shape": "", "duration": "", "description": "Woman reports 2 craft fly over house. Strange events taking place in town w/ paramilitary activities."}
{"sighted_at": "19950915", "reported_at": "19950915", "location": " Redmond, WA", "shape": "", "duration": "6 min.", "description": "Young man w/ 2 co-workers witness tiny, distinctly white round disc drifting slowly toward NE. Flew in dir. 90 deg. to winds."}


looks pretty interesting. As one last (obvious to some, sure) simple check lets see how big it is:


$: ls -lh chimps_16154-2010-10-20_14-33-35
total 220M
-rw-r--r-- 1 jacob 3.4K 2010-10-20 09:34 16154.yaml
-rw-r--r-- 1 jacob 908 2010-10-20 09:34 README-infochimps
-rw------- 1 jacob 72M 2010-10-20 09:33 ufo_awesome.avro
-rw------- 1 jacob 77M 2010-10-20 09:33 ufo_awesome.json
-rw------- 1 jacob 72M 2010-10-20 09:33 ufo_awesome.tsv


Load Data


Now, 77M is small enough that you COULD process on a single machine with methods you already know. However, this example is about hadoop so let's go ahead and throw it on the hadoop distributed file system (HDFS) so we can process it in parallel:

hdp-mkdir /data/domestic/ufo
hdp-put chimps_16154-2010-10-20_14-33-35/ufo_awesome.json /data/domestic/ufo/


(I'm going to have to assume you already have a HDFS up an running, see this for a simple how-to.)
If all goes well you should see your file there

hdp-ls /data/domestic/ufo/
Found 1 items
-rw-r--r-- 1 jacob supergroup 80346460 2011-01-16 21:21 /data/domestic/ufo/ufo_awesome.json


Process Data



Let's write a really simple wukong script to find the most popular ufo shapes:


#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'
require 'json'

class JSONMapper < Wukong::Streamer::LineStreamer
def process record
sighting = JSON.parse(record) rescue {}
return unless sighting["shape"]
yield sighting["shape"] unless sighting["shape"].empty?
end
end

class ShapeReducer < Wukong::Streamer::AccumulatingReducer

def start! shape
@count = 0
end

def accumulate shape
@count += 1
end

def finalize
yield [key, @count]
end

end

Wukong::Script.new(JSONMapper, ShapeReducer).run


Mapper


Our mapper class, JSONMapper, is nearly identical to the earlier post. All it does is read in single json records from $stdin, parse out the "shape" field (with some rescues for handling data nastiness), and emit the "shape" field back to $stdout.

Reducer


The reducer, ShapeReducer, is about the simplest reducer in Wukong that still illustrates the major points:

* start! - the first method that is called on a new group of data. A group is, if you remember your map-reduce, all records with the same key. In this case it's all the "shape" fields with the same shape. All this method does is decide how to initialize any internal state a reducer has. In this case we simply initialize a counter to 0.

* accumulate - even simpler. This method operates on each record in the group. In our simple case we just increment the internal counter by 1.

* finalize - the final step in the reduce. We've processed all our records and so we yield the key corresponding to our group (that's just going to be the unique "shape") and the count so far.

And that's it. Let's save it into a file called "process_ufo.rb" and run it locally on 10000 lines:


$: cat chimps_16154-2010-10-20_14-33-35/ufo_awesome.json| head -n10000| ./process_ufo.rb --map | sort | ./process_ufo.rb --reduce | sort -nk2 | wu-lign
changed 1
dome 1
flare 1
hexagon 1
pyramid 1
crescent 2
round 2
delta 8
cross 25
cone 41
teardrop 79
rectangle 112
egg 113
chevron 128
diamond 137
flash 138
cylinder 155
changing 204
cigar 255
formation 290
oval 333
unknown 491
sphere 529
circle 667
other 721
disk 727
fireball 799
triangle 868
light 1760


Notice when we run this locally we have to stick the "sort" program in there. This is to simulate what hadoop gives us for free. It looks like light is going to come out ahead. Let's see what happens when we run it with hadoop:


$: ./process_ufo.rb --run /data/domestic/ufo/ufo_awesome.json /data/domestic/ufo/shape_counts
I, [2011-01-16T21:51:43.431534 #11447] INFO -- : Launching hadoop!
I, [2011-01-16T21:51:43.476626 #11447] INFO -- : Running

/usr/local/share/hadoop/bin/hadoop \
jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.job.name='process_ufo.rb---/data/domestic/ufo/ufo_awesome.json---/data/domestic/ufo/shape_counts' \
-mapper '/usr/bin/ruby1.8 process_ufo.rb --map ' \
-reducer '/usr/bin/ruby1.8 /home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb --reduce ' \
-input '/data/domestic/ufo/ufo_awesome.json' \
-output '/data/domestic/ufo/shape_counts' \
-file '/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb' \
-cmdenv 'RUBYLIB=$HOME/.rubylib'

11/01/16 21:51:45 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/16 21:51:46 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]
11/01/16 21:51:46 INFO streaming.StreamJob: Running job: job_201012031305_0221
11/01/16 21:51:46 INFO streaming.StreamJob: To kill this job, run:
11/01/16 21:51:46 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_0221
11/01/16 21:51:46 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0221
11/01/16 21:51:47 INFO streaming.StreamJob: map 0% reduce 0%
11/01/16 21:51:59 INFO streaming.StreamJob: map 100% reduce 0%
11/01/16 21:52:12 INFO streaming.StreamJob: map 100% reduce 100%
11/01/16 21:52:15 INFO streaming.StreamJob: Job complete: job_201012031305_0221
11/01/16 21:52:15 INFO streaming.StreamJob: Output: /data/domestic/ufo/shape_counts
packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar3466191386581838257/] [] /tmp/streamjob3112551829711880856.jar tmpDir=null


As one final step let's cat the output data and take a look at it:


hdp-catd /data/domestic/ufo/shape_counts | sort -nk2 | wu-lign
changed 1
dome 1
flare 1
hexagon 1
pyramid 1
crescent 2
round 2
delta 8
cross 177
cone 265
teardrop 592
egg 661
chevron 757
diamond 909
rectangle 957
cylinder 980
flash 988
changing 1532
cigar 1774
formation 1774
oval 2859
fireball 3436
sphere 3613
unknown 4458
other 4570
disk 4794
circle 5249
triangle 6036
light 12138


Pretty simple?

8 comments:

  1. I should point out, since I did not mention it in the post, that external ruby libraries (wukong, json, etc) will have to be installed on all machines in the cluster. There are a number of ways around this but they haven't been implemented in wukong yet.

    --thedatachef

    ReplyDelete
  2. works like a champ here: i'm a big fan ;)

    - sog

    ReplyDelete
  3. Thanks for excellent articles. I tried running the above example on a mac in Pseudo-Distributed mode. But I am getting this error.

    `execute_command!': Streaming command failed!

    Full log below.


    ./process_ufo.rb --run /wk/in/ /wk/out/
    I, [2011-09-28T16:10:09.462680 #77922] INFO -- : Launching hadoop!
    I, [2011-09-28T16:10:09.462774 #77922] INFO -- : Running

    /MyApps/hadoop-0.20.203.0/bin/hadoop \
    jar /MyApps/hadoop-0.20.203.0/contrib/streaming/hadoop-*streaming*.jar \
    -D mapred.job.name='process_ufo.rb---/wk/in/---/wk/out/' \
    -mapper '/usr/local/rvm/rubies/ruby-1.9.2-p180/bin/ruby process_ufo.rb --map --log_interval=10000 --log_seconds=30' \
    -reducer '/usr/local/rvm/rubies/ruby-1.9.2-p180/bin/ruby process_ufo.rb --reduce --log_interval=10000 --log_seconds=30' \
    -input '/wk/in/' \
    -output '/wk/out/' \
    -file '/Users/ltahmaz/Workspace/wukong/process_ufo.rb'

    11/09/28 16:10:10 INFO mapred.FileInputFormat: Total input paths to process : 1
    11/09/28 16:10:11 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-ltahmaz/mapred/local]
    11/09/28 16:10:11 INFO streaming.StreamJob: Running job: job_201109281601_0001
    11/09/28 16:10:11 INFO streaming.StreamJob: To kill this job, run:
    11/09/28 16:10:11 INFO streaming.StreamJob: /MyApps/hadoop-0.20.203.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201109281601_0001
    11/09/28 16:10:11 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201109281601_0001
    11/09/28 16:10:12 INFO streaming.StreamJob: map 0% reduce 0%
    11/09/28 16:10:48 INFO streaming.StreamJob: map 100% reduce 100%
    11/09/28 16:10:48 INFO streaming.StreamJob: To kill this job, run:
    11/09/28 16:10:48 INFO streaming.StreamJob: /MyApps/hadoop-0.20.203.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201109281601_0001
    11/09/28 16:10:48 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201109281601_0001
    11/09/28 16:10:48 ERROR streaming.StreamJob: Job not successful. Error: NA
    11/09/28 16:10:48 INFO streaming.StreamJob: killJob...
    Streaming Job Failed!
    packageJobJar: [/Users/ltahmaz/Workspace/wukong/process_ufo.rb, /tmp/hadoop-ltahmaz/hadoop-unjar7524524985480504737/] [] /var/folders/h6/h6FJLoDWEe8aiUdvGDUxi++++TM/-Tmp-/streamjob6812103337194843704.jar tmpDir=null
    /usr/local/rvm/gems/ruby-1.9.2-p180/gems/wukong-2.0.1/lib/wukong/script.rb:234:in `execute_command!': Streaming command failed! (RuntimeError)
    from /usr/local/rvm/gems/ruby-1.9.2-p180/gems/wukong-2.0.1/lib/wukong/script/hadoop_command.rb:78:in `execute_hadoop_workflow'
    from /usr/local/rvm/gems/ruby-1.9.2-p180/gems/wukong-2.0.1/lib/wukong/script.rb:152:in `run'
    from ./process_ufo.rb:31:in `'

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. Okay I just resolved the above issue (qualifies under operator error)

    Running using ruby 1.9.2 fails and the above error is reported.
    Running using ruby 1.8.7 works just fine.

    Sorry for the spam.

    ReplyDelete
    Replies
    1. Could you kindly explain detailed how did you fix this?
      I already switch to ruby 1.8.7 but exactly this error still happen.
      Thank you

      Delete
    2. Daniyar,
      Did you manage to fix the problem?
      I am facing the same :(

      Thank you

      Delete