<?xml version='1.0' encoding='UTF-8'?><?xml-stylesheet href="http://www.blogger.com/styles/atom.css" type="text/css"?><feed xmlns='http://www.w3.org/2005/Atom' xmlns:openSearch='http://a9.com/-/spec/opensearchrss/1.0/' xmlns:georss='http://www.georss.org/georss' xmlns:gd='http://schemas.google.com/g/2005' xmlns:thr='http://purl.org/syndication/thread/1.0'><id>tag:blogger.com,1999:blog-2875432366090970056</id><updated>2012-02-24T10:23:29.399-08:00</updated><category term='pig'/><category term='ruby'/><category term='tfidf with pig'/><category term='degree distribution with pig'/><category term='fifo'/><category term='infochimps'/><category term='text processing with pig'/><category term='lucene tokenization pig'/><category term='workflow'/><category term='java_home'/><category term='jruby hadoop'/><category term='elasticsearch'/><category term='degree distribution'/><category term='hacking'/><category term='jaccard similarity hadoop'/><category term='graph'/><category term='pig store elasticsearch'/><category term='hadoop'/><category term='pig udf example'/><category term='apache lucene'/><category term='pig stream'/><category term='analysis'/><category term='rstats'/><category term='bulk'/><category term='script'/><category term='degree distribution with hadoop'/><category term='tokenize udf'/><category term='map-reduce'/><category term='raw text'/><category term='indexing text with hadoop'/><category term='process corpora with hadoop'/><category term='rake'/><category term='jaccard index'/><category term='notes'/><category term='wukong'/><category term='linux'/><category term='apache'/><category term='item-item similarity'/><category term='xml'/><category term='intersection apache pig'/><category term='math'/><category term='plot'/><category term='wukong example'/><category term='java'/><category term='parse'/><category term='intersection'/><category term='tsv'/><category term='lucene'/><category term='indexing'/><category term='bash'/><category term='jaccard similarity pig'/><category term='text processing'/><category term='wonderdog'/><category term='cloudera'/><category term='tfidf with hadoop'/><category term='pig quadkeys'/><category term='pig-0.8'/><category term='text processing with hadoop'/><category term='hadoop example'/><category term='unix'/><category term='jaccard similarity'/><category term='tokenize text with pig'/><category term='mac'/><category term='search'/><category term='similarity apache pig'/><category term='command-line'/><category term='indexing text with pig'/><category term='similarity hadoop'/><category term='network'/><category term='intersection hadoop'/><category term='jruby'/><category term='jruby apache pig'/><category term='nearest neighbors hadoop'/><category term='cat'/><category term='data'/><category term='apache pig'/><category term='swineherd'/><category term='examples'/><category term='apache hadoop'/><category term='json'/><category term='mac os X'/><category term='R'/><category term='tfidf'/><title type='text'>Data Recipes</title><subtitle type='html'></subtitle><link rel='http://schemas.google.com/g/2005#feed' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/posts/default'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default?max-results=100'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/'/><link rel='hub' href='http://pubsubhubbub.appspot.com/'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><generator version='7.00' uri='http://www.blogger.com'>Blogger</generator><openSearch:totalResults>21</openSearch:totalResults><openSearch:startIndex>1</openSearch:startIndex><openSearch:itemsPerPage>100</openSearch:itemsPerPage><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-6738889773115636631</id><published>2011-10-26T06:44:00.000-07:00</published><updated>2011-10-26T09:17:16.496-07:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='jruby hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='apache hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='jruby'/><category scheme='http://www.blogger.com/atom/ns#' term='nearest neighbors hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='pig udf example'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='pig quadkeys'/><category scheme='http://www.blogger.com/atom/ns#' term='jruby apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Nearest Neighbors With Apache Pig and Jruby</title><content type='html'>Since it's been such a long time since I last posted I thought I'd make this one a bit longer. It really is a condensing of a lot of things I've been working with and thinking about over the past few months.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Nearest Neighbors&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;The nearest neighbors problem (also known as the post-office problem) is this: Given a point X in some metric space M, assign to it the nearest neighboring point S. In other words, given a residence, assign to it the nearest post office. The K-nearest neighbors problem, which this post addresses, is just a slight generalization of that problem. Instead of just one neighbor we are looking for K neighbors.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Problem&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;So, we're going to use the geonames data. This is a set of nearly 8 million geo points with names, coordinates, and a bunch of other good stuff, from around the world. We would like to find, for a given point in the geonames set, the 5 nearest points (also in geonames) that are nearest to it. Should be pretty simple yeah?&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Get data&lt;/h3&gt;&lt;br /&gt;The geonames data set 'allCountries.zip' can be downloaded like so:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:bash"&gt;&lt;br /&gt;$: wget http://download.geonames.org/export/dump/allCountries.zip&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Prepare data&lt;/h3&gt;&lt;br /&gt;Since the geonames data set comes as a nice tab-separated-values (.tsv) file already it's just a matter of unzipping the package and placing it on your hdfs (you do have one of those don't you?). Do:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:bash"&gt;&lt;br /&gt;$: unzip allCountries.zip&lt;br /&gt;$: hadoop fs -put allCountries.txt .&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;to unzip the package and place the tsv file into your home directory on the hadoop distributed file system.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Schema&lt;/h3&gt;&lt;br /&gt;Oh, and by the way, before we forget, the data from geonames has this pig schema:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:bash"&gt;&lt;br /&gt;  geonameid:         int,&lt;br /&gt;  name:              chararray,&lt;br /&gt;  asciiname:         chararray,&lt;br /&gt;  alternatenames:    chararray,&lt;br /&gt;  latitude:          double,&lt;br /&gt;  longitude:         double,&lt;br /&gt;  feature_class:     chararray,&lt;br /&gt;  feature_code:      chararray,&lt;br /&gt;  country_code:      chararray,&lt;br /&gt;  cc2:               chararray,&lt;br /&gt;  admin1_code:       chararray,&lt;br /&gt;  admin2_code:       chararray,&lt;br /&gt;  admin3_code:       chararray,&lt;br /&gt;  admin4_code:       chararray,&lt;br /&gt;  population:        long,&lt;br /&gt;  elevation:         int,&lt;br /&gt;  gtopo30:           int,&lt;br /&gt;  timezone:          chararray,&lt;br /&gt;  modification_date: chararray&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;The Algorithm&lt;/h3&gt;&lt;br /&gt;Now that we have the data we can start to play with it and think about how to solve the problem at hand. Looking at the data (use something like 'head', 'cat', 'cut', etc) we see that there are really only three fields of interest in the data: (geonameid, longitude, and latitude). All the other fields are just nice metadata which we can attach later.&lt;br /&gt;&lt;br /&gt;Now, since we're going to be using Apache Pig to solve this problem we need to think a little bit about parallelism. One constraint is that &lt;span style="font-style:italic;"&gt;at no time&lt;/span&gt; is any one point going to have access to the locations of all the other points. In other words, we will not be storing the full set of points in memory. Besides, it's 8 million points, that's kind of a lot for my poor little machine to handle. &lt;br /&gt;&lt;br /&gt;So it's clear (right?) that we're going to have to partition the space in some way. Then, within a partition of the space, we'll need to apply a local version of the nearest neighbors algorithm. That's it really. Map and reduce. Wait, but there's one problem. What happens if we don't find all 5 neighbors for a point in a single partition? Hmmm. Well, the answer is iteration. We'll choose a small partition size to begin with and gradually increase the partition size until either the partition size is too large or all the neighbors have been found. Got it? &lt;br /&gt;&lt;br /&gt;Recap:&lt;br /&gt;&lt;br /&gt;&lt;ul&gt;&lt;br /&gt;&lt;li&gt;(1) Partition the space &lt;/li&gt;&lt;br /&gt;&lt;li&gt;(2) Search for nearest neighbors in a single partition&lt;/li&gt;&lt;br /&gt;&lt;li&gt;(3) If all neighbors have been found, terminate; else increase partition size and repeat (1) and (2)&lt;/li&gt;&lt;br /&gt;&lt;/ul&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Implementation&lt;/h3&gt;&lt;br /&gt;For partitioning the space we're going to use Google quadkeys (http://msdn.microsoft.com/en-us/library/bb259689.aspx) since it's super easy to implement and it partitions the space nicely. This will be a java UDF for Pig that takes a (longitude, latitude, and zoom level) tuple and returns a string quadkey (the partition id).&lt;br /&gt;&lt;br /&gt;Here's the actual java code for that. Let's call it "GetQuadkey":&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:java"&gt;&lt;br /&gt;package sounder.pig.geo;&lt;br /&gt;&lt;br /&gt;import java.io.IOException;&lt;br /&gt;import org.apache.pig.EvalFunc;&lt;br /&gt;import org.apache.pig.data.Tuple;&lt;br /&gt;&lt;br /&gt;/**&lt;br /&gt;   See: http://msdn.microsoft.com/en-us/library/bb259689.aspx&lt;br /&gt;&lt;br /&gt;   A Pig UDF to compute the quadkey string for a given&lt;br /&gt;   (longitude, latitude, resolution) tuple.&lt;br /&gt;   &lt;br /&gt; */&lt;br /&gt;public class GetQuadkey extends EvalFunc&amp;lt String&amp;gt {&lt;br /&gt;    private static final int TILE_SIZE = 256;&lt;br /&gt;&lt;br /&gt;    public String exec(Tuple input) throws IOException {&lt;br /&gt;        if (input == null || input.size() &lt; 3 || input.isNull(0) || input.isNull(1) || input.isNull(2))&lt;br /&gt;            return null;&lt;br /&gt;&lt;br /&gt;        Double longitude = (Double)input.get(0);&lt;br /&gt;        Double latitude = (Double)input.get(1);&lt;br /&gt;        Integer resolution = (Integer)input.get(2);&lt;br /&gt;&lt;br /&gt;        String quadKey = quadKey(longitude, latitude, resolution);&lt;br /&gt;        return quadKey;        &lt;br /&gt;    }&lt;br /&gt;&lt;br /&gt;    private static String quadKey(double longitude, double latitude, int resolution) {&lt;br /&gt;        int[] pixels = pointToPixels(longitude, latitude, resolution);&lt;br /&gt;        int[] tiles = pixelsToTiles(pixels[0], pixels[1]);&lt;br /&gt;        return tilesToQuadKey(tiles[0], tiles[1], resolution);&lt;br /&gt;    }&lt;br /&gt;&lt;br /&gt;    /**&lt;br /&gt;       Return the pixel X and Y coordinates for the given lat, lng, and resolution.&lt;br /&gt;     */&lt;br /&gt;    private static int[] pointToPixels(double longitude, double latitude, int resolution) {&lt;br /&gt;        double x = (longitude + 180) / 360;&lt;br /&gt;        double sinLatitude = Math.sin(latitude * Math.PI / 180);&lt;br /&gt;        double y = 0.5 - Math.log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * Math.PI);&lt;br /&gt;&lt;br /&gt;        int mapSize = mapSize(resolution);&lt;br /&gt;        int[] pixels = {(int) trim(x * mapSize + 0.5, 0, mapSize - 1), (int) trim(y * mapSize + 0.5, 0, mapSize - 1)};&lt;br /&gt;        return pixels;&lt;br /&gt;    }&lt;br /&gt;&lt;br /&gt;    /**&lt;br /&gt;       Convert from pixel coordinates to tile coordinates.&lt;br /&gt;     */&lt;br /&gt;    private static int[] pixelsToTiles(int pixelX, int pixelY) {&lt;br /&gt;        int[] tiles = {pixelX / TILE_SIZE, pixelY / TILE_SIZE};&lt;br /&gt;        return tiles;&lt;br /&gt;    }&lt;br /&gt;    &lt;br /&gt;    /**&lt;br /&gt;       Finally, given tile coordinates and a resolution, returns the appropriate quadkey&lt;br /&gt;     */&lt;br /&gt;    private static String tilesToQuadKey(int tileX, int tileY, int resolution) {&lt;br /&gt;        StringBuilder quadKey = new StringBuilder();&lt;br /&gt;        for (int i = resolution; i &gt; 0; i--) {&lt;br /&gt;            char digit = '0';&lt;br /&gt;            int mask = 1 &lt;&lt; (i - 1);&lt;br /&gt;            if ((tileX &amp; mask) != 0) {&lt;br /&gt;                digit++;&lt;br /&gt;            }&lt;br /&gt;            if ((tileY &amp; mask) != 0) {&lt;br /&gt;                digit++;&lt;br /&gt;                digit++;&lt;br /&gt;            }&lt;br /&gt;            quadKey.append(digit);&lt;br /&gt;        }&lt;br /&gt;        return quadKey.toString();&lt;br /&gt;    }&lt;br /&gt;    &lt;br /&gt;    /**&lt;br /&gt;       Ensure input value is within minval and maxval&lt;br /&gt;     */&lt;br /&gt;    private static double trim(double n, double minVal, double maxVal) {&lt;br /&gt;        return Math.min(Math.max(n, minVal), maxVal);&lt;br /&gt;    }&lt;br /&gt;&lt;br /&gt;    /**&lt;br /&gt;       Width of the map, in pixels, at the given resolution&lt;br /&gt;     */&lt;br /&gt;    public static int mapSize(int resolution) {&lt;br /&gt;        return TILE_SIZE &lt;&lt; resolution;&lt;br /&gt;    }&lt;br /&gt;}&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Next we need to have another java udf that operates on all the points in a partition. Let's call it "NearestNeighbors". Here's a naive implementation of that:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:java"&gt;&lt;br /&gt;package sounder.pig.geo.nearestneighbors;&lt;br /&gt;&lt;br /&gt;import java.io.IOException;&lt;br /&gt;import java.util.PriorityQueue;&lt;br /&gt;import java.util.Iterator;&lt;br /&gt;import java.util.Comparator;&lt;br /&gt;&lt;br /&gt;import org.apache.pig.backend.executionengine.ExecException;&lt;br /&gt;import org.apache.pig.EvalFunc;&lt;br /&gt;import org.apache.pig.data.Tuple;&lt;br /&gt;import org.apache.pig.data.TupleFactory;&lt;br /&gt;import org.apache.pig.data.BagFactory;&lt;br /&gt;import org.apache.pig.data.DataBag;&lt;br /&gt;&lt;br /&gt;public class NearestNeighbors extends EvalFunc&amp;lt DataBag&amp;gt {&lt;br /&gt;    private static TupleFactory tupleFactory = TupleFactory.getInstance();&lt;br /&gt;    private static BagFactory bagFactory = BagFactory.getInstance();&lt;br /&gt;    &lt;br /&gt;    public DataBag exec(Tuple input) throws IOException {&lt;br /&gt;        if (input == null || input.size() &lt; 2 || input.isNull(0) || input.isNull(1))&lt;br /&gt;            return null;&lt;br /&gt;&lt;br /&gt;        Long k = (Long)input.get(0);&lt;br /&gt;        DataBag points = (DataBag)input.get(1);      // {(id,lng,lat,{(n1,n1_dist)...})}&lt;br /&gt;        DataBag result = bagFactory.newDefaultBag(); &lt;br /&gt;&lt;br /&gt;        for (Tuple pointA : points) {&lt;br /&gt;            DataBag neighborsBag = (DataBag)pointA.get(3);&lt;br /&gt;            if (neighborsBag.size() &lt; k) {&lt;br /&gt;                PriorityQueue&amp;lt Tuple&amp;gt neighbors = toDistanceSortedQueue(k.intValue(), neighborsBag);&lt;br /&gt;                Double x1 = Math.toRadians((Double)pointA.get(1));&lt;br /&gt;                Double y1 = Math.toRadians((Double)pointA.get(2));&lt;br /&gt;                &lt;br /&gt;                for (Tuple pointB : points) {&lt;br /&gt;                    if (pointA!=pointB) {&lt;br /&gt;                        Double x2 = Math.toRadians((Double)pointB.get(1));&lt;br /&gt;                        Double y2 = Math.toRadians((Double)pointB.get(2));&lt;br /&gt;                        Double distance = haversineDistance(x1,y1,x2,y2);&lt;br /&gt;&lt;br /&gt;                        // Add this point as a neighbor if pointA has no neighbors&lt;br /&gt;                        if (neighbors.size()==0) {&lt;br /&gt;                            Tuple newNeighbor = tupleFactory.newTuple(2);&lt;br /&gt;                            newNeighbor.set(0, pointB.get(0));&lt;br /&gt;                            newNeighbor.set(1, distance);&lt;br /&gt;                            neighbors.add(newNeighbor);&lt;br /&gt;                        }&lt;br /&gt;&lt;br /&gt;                        Tuple furthestNeighbor = neighbors.peek();&lt;br /&gt;                        Double neighborDist = (Double)furthestNeighbor.get(1);&lt;br /&gt;                        if (distance &lt; neighborDist) {&lt;br /&gt;                            Tuple newNeighbor = tupleFactory.newTuple(2);&lt;br /&gt;                            newNeighbor.set(0, pointB.get(0));&lt;br /&gt;                            newNeighbor.set(1, distance);&lt;br /&gt;                            &lt;br /&gt;                            if (neighbors.size() &lt; k) {                                &lt;br /&gt;                                neighbors.add(newNeighbor);&lt;br /&gt;                            } else {&lt;br /&gt;                                neighbors.poll(); // remove farthest&lt;br /&gt;                                neighbors.add(newNeighbor);&lt;br /&gt;                            }&lt;br /&gt;                        }&lt;br /&gt;                    }&lt;br /&gt;                }&lt;br /&gt;                // Should now have a priorityqueue containing a sorted list of neighbors&lt;br /&gt;                // create new result tuple and add to result bag&lt;br /&gt;                Tuple newPointA = tupleFactory.newTuple(4);&lt;br /&gt;                newPointA.set(0, pointA.get(0));&lt;br /&gt;                newPointA.set(1, pointA.get(1));&lt;br /&gt;                newPointA.set(2, pointA.get(2));&lt;br /&gt;                newPointA.set(3, fromQueue(neighbors));&lt;br /&gt;                result.add(newPointA);&lt;br /&gt;            } else {&lt;br /&gt;                result.add(pointA);&lt;br /&gt;            }                    &lt;br /&gt;        }&lt;br /&gt;        return result;&lt;br /&gt;    }&lt;br /&gt;&lt;br /&gt;    // Ensure sorted by descending&lt;br /&gt;    private PriorityQueue&amp;lt Tuple&amp;gt toDistanceSortedQueue(int k, DataBag bag) {&lt;br /&gt;        PriorityQueue&amp;lt Tuple&amp;gt q = new PriorityQueue&amp;lt Tuple&amp;gt(k,&lt;br /&gt;                                                          new Comparator&amp;lt Tuple&amp;gt() {&lt;br /&gt;                                                              public int compare(Tuple t1, Tuple t2) {&lt;br /&gt;                                                                  try {&lt;br /&gt;                                                                      Double dist1 = (Double)t1.get(1);&lt;br /&gt;                                                                      Double dist2 = (Double)t2.get(1);&lt;br /&gt;                                                                      return dist2.compareTo(dist1);&lt;br /&gt;                                                                  } catch (ExecException e) {&lt;br /&gt;                                                                      throw new RuntimeException("Error comparing tuples", e);&lt;br /&gt;                                                                  }&lt;br /&gt;                                                              };&lt;br /&gt;                                                          });&lt;br /&gt;        for (Tuple tuple : bag) q.add(tuple);            &lt;br /&gt;        return q;&lt;br /&gt;    }&lt;br /&gt;&lt;br /&gt;    private DataBag fromQueue(PriorityQueue&amp;lt Tuple&amp;gt q) {&lt;br /&gt;        DataBag bag = bagFactory.newDefaultBag();&lt;br /&gt;        for (Tuple tuple : q) bag.add(tuple);&lt;br /&gt;        return bag;&lt;br /&gt;    }&lt;br /&gt;    &lt;br /&gt;    private Double haversineDistance(Double x1, Double y1, Double x2, Double y2) {&lt;br /&gt;        double a = Math.pow(Math.sin((x2-x1)/2), 2)&lt;br /&gt;            + Math.cos(x1) * Math.cos(x2) * Math.pow(Math.sin((y2-y1)/2), 2);&lt;br /&gt;&lt;br /&gt;        return (2 * Math.asin(Math.min(1, Math.sqrt(a))));&lt;br /&gt;    }&lt;br /&gt;}&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;The details of the NearestNeighbors UDF aren't super important and it's mostly pretty clear what's going on. Just know that it operates on a bag of points as input and returns a bag of points as output that has the same schema. This is really important since we're going to be iterating.&lt;br /&gt;&lt;br /&gt;Then we're on to the Pig part, hurray! Since Pig doesn't have any built in support for iteration, I chose to use Jruby (because it's awesome) and pig's "PigServer" java class to do all the work. Here's what the jruby runner looks like (it's kind of a lot so don't get scared):&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:ruby"&gt;&lt;br /&gt;#!/usr/bin/env jruby&lt;br /&gt;&lt;br /&gt;require 'java'&lt;br /&gt;&lt;br /&gt;#&lt;br /&gt;# You might consider changing this to point to where you have&lt;br /&gt;# pig installed...&lt;br /&gt;#&lt;br /&gt;jar  = "/usr/lib/pig/pig-0.8.1-cdh3u1-core.jar"&lt;br /&gt;conf = "/etc/hadoop/conf"&lt;br /&gt;&lt;br /&gt;$CLASSPATH &lt;&lt; conf&lt;br /&gt;require jar&lt;br /&gt;&lt;br /&gt;import org.apache.pig.ExecType&lt;br /&gt;import org.apache.pig.PigServer&lt;br /&gt;import org.apache.pig.FuncSpec&lt;br /&gt;&lt;br /&gt;class NearestNeighbors&lt;br /&gt;&lt;br /&gt;  attr_accessor :points, :k, :min_zl, :runmode&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Create a new nearest neighbors instance&lt;br /&gt;  # for the given points, k neighbors to find,&lt;br /&gt;  # a optional minimum zl (1-21) and optional&lt;br /&gt;  # hadoop run mode (local or mapreduce)&lt;br /&gt;  #&lt;br /&gt;  def initialize points, k, min_zl=20, runmode='mapreduce'&lt;br /&gt;    @points  = points&lt;br /&gt;    @k       = k&lt;br /&gt;    @min_zl  = min_zl&lt;br /&gt;    @runmode = runmode&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Run the nearest neighbors algorithm&lt;br /&gt;  #&lt;br /&gt;  def run&lt;br /&gt;    start_pig_server&lt;br /&gt;    register_jars_and_functions&lt;br /&gt;    run_algorithm&lt;br /&gt;    stop_pig_server&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Actually runs all the pig queries for&lt;br /&gt;  # the algorithm. Stops if all neighbors&lt;br /&gt;  # have been found or if min_zl is reached&lt;br /&gt;  #&lt;br /&gt;  def run_algorithm&lt;br /&gt;    start_nearest_neighbors(points, k, 22)&lt;br /&gt;    if run_nearest_neighbors(k, 22)&lt;br /&gt;      21.downto(min_zl) do |zl|&lt;br /&gt;        iterate_nearest_neighbors(k, zl)&lt;br /&gt;        break unless run_nearest_neighbors(k,zl)&lt;br /&gt;      end&lt;br /&gt;    end&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Registers algorithm initialization queries&lt;br /&gt;  #&lt;br /&gt;  def start_nearest_neighbors(input, k, zl)&lt;br /&gt;    @pig.register_query(PigQueries.load_points(input))&lt;br /&gt;    @pig.register_query(PigQueries.generate_initial_quadkeys(zl))&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Registers algorithm iteration queries&lt;br /&gt;  #&lt;br /&gt;  def iterate_nearest_neighbors k, zl&lt;br /&gt;    @pig.register_query(PigQueries.load_prior_done(zl))&lt;br /&gt;    @pig.register_query(PigQueries.load_prior_not_done(zl))&lt;br /&gt;    @pig.register_query(PigQueries.union_priors(zl))&lt;br /&gt;    @pig.register_query(PigQueries.trim_quadkey(zl))&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Runs one iteration of the algorithm&lt;br /&gt;  #&lt;br /&gt;  def run_nearest_neighbors(k, zl)&lt;br /&gt;    @pig.register_query(PigQueries.group_by_quadkey(zl))&lt;br /&gt;    @pig.register_query(PigQueries.nearest_neighbors(k, zl))&lt;br /&gt;    @pig.register_query(PigQueries.split_results(k, zl))&lt;br /&gt;&lt;br /&gt;    if !@pig.exists_file("done#{zl}")&lt;br /&gt;      @pig.store("done#{zl}", "done#{zl}")&lt;br /&gt;      not_done = @pig.store("not_done#{zl}", "not_done#{zl}")&lt;br /&gt;      not_done.get_results.has_next?&lt;br /&gt;    else&lt;br /&gt;      true&lt;br /&gt;    end&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Start a new pig server with the specified run mode&lt;br /&gt;  #&lt;br /&gt;  def start_pig_server&lt;br /&gt;    @pig = PigServer.new(runmode)&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Stop the running pig server&lt;br /&gt;  #&lt;br /&gt;  def stop_pig_server&lt;br /&gt;    @pig.shutdown&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # Register the jar that contains the nearest neighbors&lt;br /&gt;  # and quadkeys udfs and define functions for them.&lt;br /&gt;  #&lt;br /&gt;  def register_jars_and_functions&lt;br /&gt;    @pig.register_jar('../../../../udf/target/sounder-1.0-SNAPSHOT.jar')&lt;br /&gt;    @pig.register_function('GetQuadkey',       FuncSpec.new('sounder.pig.geo.GetQuadkey()'))&lt;br /&gt;    @pig.register_function('NearestNeighbors', FuncSpec.new('sounder.pig.geo.nearestneighbors.NearestNeighbors()'))&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # A simple class to contain the pig queries&lt;br /&gt;  #&lt;br /&gt;  class PigQueries&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Load the geonames points. Obviously,&lt;br /&gt;    # this should be modified to accept a&lt;br /&gt;    # variable schema.&lt;br /&gt;    #&lt;br /&gt;    def self.load_points geonames&lt;br /&gt;      "points = LOAD '#{geonames}' AS (&lt;br /&gt;     geonameid:         int,&lt;br /&gt;     name:              chararray,&lt;br /&gt;     asciiname:         chararray,&lt;br /&gt;     alternatenames:    chararray,&lt;br /&gt;     latitude:          double,&lt;br /&gt;     longitude:         double,&lt;br /&gt;     feature_class:     chararray,&lt;br /&gt;     feature_code:      chararray,&lt;br /&gt;     country_code:      chararray,&lt;br /&gt;     cc2:               chararray,&lt;br /&gt;     admin1_code:       chararray,&lt;br /&gt;     admin2_code:       chararray,&lt;br /&gt;     admin3_code:       chararray,&lt;br /&gt;     admin4_code:       chararray,&lt;br /&gt;     population:        long,&lt;br /&gt;     elevation:         int,&lt;br /&gt;     gtopo30:           int,&lt;br /&gt;     timezone:          chararray,&lt;br /&gt;     modification_date: chararray&lt;br /&gt;   );"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Query to generate quadkeys at the specified zoom level&lt;br /&gt;    #&lt;br /&gt;    def self.generate_initial_quadkeys(zl)&lt;br /&gt;      "projected#{zl} = FOREACH points GENERATE GetQuadkey(longitude, latitude, #{zl}) AS quadkey, geonameid, longitude, latitude, {};"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Load previous iteration's done points&lt;br /&gt;    #&lt;br /&gt;    def self.load_prior_done(zl)&lt;br /&gt;      "prior_done#{zl+1} = LOAD 'done#{zl+1}/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Load previous iteration's not done points&lt;br /&gt;    #&lt;br /&gt;    def self.load_prior_not_done(zl)&lt;br /&gt;      "prior_not_done#{zl+1} = LOAD 'not_done#{zl+1}/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Union the previous iterations points that are done&lt;br /&gt;    # with the points that are not done&lt;br /&gt;    #&lt;br /&gt;    def self.union_priors zl&lt;br /&gt;      "prior_neighbors#{zl+1} = UNION prior_done#{zl+1}, prior_not_done#{zl+1};"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Chop off one character of precision from the existing&lt;br /&gt;    # quadkey to go one zl down.&lt;br /&gt;    #&lt;br /&gt;    def self.trim_quadkey zl&lt;br /&gt;      "projected#{zl} = FOREACH prior_neighbors#{zl+1} GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, #{zl}) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Group the points by quadkey&lt;br /&gt;    #&lt;br /&gt;    def self.group_by_quadkey zl&lt;br /&gt;      "grouped#{zl}  = FOREACH (GROUP projected#{zl} BY quadkey) GENERATE group AS quadkey, projected#{zl}.(geonameid, longitude, latitude, $4) AS points_bag;"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Run the nearest neighbors udf on all the points for&lt;br /&gt;    # a given quadkey&lt;br /&gt;    #&lt;br /&gt;    def self.nearest_neighbors(k, zl)&lt;br /&gt;      "nearest_neighbors#{zl} = FOREACH grouped#{zl} GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(#{k}l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );"&lt;br /&gt;    end&lt;br /&gt;&lt;br /&gt;    #&lt;br /&gt;    # Split the results into done and not_done relations&lt;br /&gt;    # The algorithm is done when 'not_done' contains&lt;br /&gt;    # no more tuples.&lt;br /&gt;    #&lt;br /&gt;    def self.split_results(k, zl)&lt;br /&gt;      "SPLIT nearest_neighbors#{zl} INTO done#{zl} IF COUNT(neighbors) &gt;= #{k}l, not_done#{zl} IF COUNT(neighbors) &lt; #{k}l;"&lt;br /&gt;    end&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;NearestNeighbors.new(ARGV[0], ARGV[1]).run&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Call this file "nearest_neighbors.rb". The idea here is that we register some basic pig queries to do the initialization of the algorithm and the iterations. These queries are run over and over until either the "not_done" relation contains no more elements or the minimum zoom level has been reached. Note that a small zoom level means a big partition of space.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Run it!&lt;/h2&gt;&lt;br /&gt;I think we're finally ready to run it. Let K=5 and the min zoom level (zl) be 10. Then just run:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:bash"&gt;&lt;br /&gt;$: ./nearest_neighbors.rb allCountries.txt 5 10&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;To kick it off. The output will live in 'done10' (in your home directory on the hdfs) and all the ones that couldn't find their neighbors (poor guys) are left in 'not_done10'. Let's take a look:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:bash"&gt;&lt;br /&gt;$: hadoop fs -cat done10/part* | head &lt;br /&gt;  22321231 5874132 -164.73333  67.83333 {(5870713,0.0020072489956274512),(5864687,0.001833343068439346),(5879702,0.0017344751302650937),(5879702,0.0017344751302650937),(5866444,9.775849082653818E-4)}&lt;br /&gt; 133223322 2631726 -17.98263   66.52688 {(2631999,8.959503690090641E-4),(2629528,8.491922360779314E-4),(2629528,8.491922360779314E-4),(2630727,3.840018669838177E-4),(2630727,3.840018669838177E-4)}&lt;br /&gt; 133223322 2631999 -18.01884   66.56514 {(2631726,8.959503690090641E-4),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5)}&lt;br /&gt; 200103201 5874186 -165.39611  64.74333 {(5864454,0.001422335354026523),(5864454,0.001422335354026523),(5867287,0.0013175743301195593),(5867186,0.0010114669397588846),(5867287,0.0013175743301195593)}&lt;br /&gt; 200103201 5878614 -165.3      64.76667 {(5874186,0.0017231123142588567),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047)}&lt;br /&gt; 200103203 5875461 -165.55111  64.53889 {(5865814,0.0028283599772347947),(5874676,0.0025819291222640857),(5876108,0.001901914079309611),(5869354,0.0016504815389672197),(5869180,0.0025319553109125676)}&lt;br /&gt; 200103300 5861248 -164.27639  64.69278 {(5880635,9.627541402483858E-4),(5878642,8.535957131129946E-4),(5878642,8.535957131129946E-4),(5876626,6.598180173900259E-4),(5876626,6.598180173900259E-4)}&lt;br /&gt; 200103300 5876626 -164.27111  64.65389 {(5880635,8.246219806226404E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4)}&lt;br /&gt; 200233011 5870290 -170.3      57.21667 {(5867100,0.00113848360324883),(7117548,0.0011082333731440464),(7117548,0.0011082333731440464),(5865746,0.001071745830095263),(5865746,0.001071745830095263)}&lt;br /&gt; 200233123 5873595 -169.48056  56.57778 {(7275749,0.0010608526185899635),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4)}&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Hurray!&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Pig Code&lt;/h3&gt;&lt;br /&gt;Let's just take a look at the pig code that actually got executed.&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:plain"&gt;&lt;br /&gt;points = LOAD 'allCountries.txt' AS (&lt;br /&gt;     geonameid:         int,&lt;br /&gt;     name:              chararray,&lt;br /&gt;     asciiname:         chararray,&lt;br /&gt;     alternatenames:    chararray,&lt;br /&gt;     latitude:          double,&lt;br /&gt;     longitude:         double,&lt;br /&gt;     feature_class:     chararray,&lt;br /&gt;     feature_code:      chararray,&lt;br /&gt;     country_code:      chararray,&lt;br /&gt;     cc2:               chararray,&lt;br /&gt;     admin1_code:       chararray,&lt;br /&gt;     admin2_code:       chararray,&lt;br /&gt;     admin3_code:       chararray,&lt;br /&gt;     admin4_code:       chararray,&lt;br /&gt;     population:        long,&lt;br /&gt;     elevation:         int,&lt;br /&gt;     gtopo30:           int,&lt;br /&gt;     timezone:          chararray,&lt;br /&gt;     modification_date: chararray&lt;br /&gt;   );&lt;br /&gt;projected22 = FOREACH points GENERATE GetQuadkey(longitude, latitude, 22) AS quadkey, geonameid, longitude, latitude, {};&lt;br /&gt;grouped22  = FOREACH (GROUP projected22 BY quadkey) GENERATE group AS quadkey, projected22.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors22 = FOREACH grouped22 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors22 INTO done22 IF COUNT(neighbors) &gt;= 5l, not_done22 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done22 = LOAD 'done22/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done22 = LOAD 'not_done22/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors22 = UNION prior_done22, prior_not_done22;&lt;br /&gt;projected21 = FOREACH prior_neighbors22 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 21) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped21  = FOREACH (GROUP projected21 BY quadkey) GENERATE group AS quadkey, projected21.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors21 = FOREACH grouped21 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors21 INTO done21 IF COUNT(neighbors) &gt;= 5l, not_done21 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done21 = LOAD 'done21/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done21 = LOAD 'not_done21/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors21 = UNION prior_done21, prior_not_done21;&lt;br /&gt;projected20 = FOREACH prior_neighbors21 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 20) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped20  = FOREACH (GROUP projected20 BY quadkey) GENERATE group AS quadkey, projected20.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors20 = FOREACH grouped20 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors20 INTO done20 IF COUNT(neighbors) &gt;= 5l, not_done20 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done20 = LOAD 'done20/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done20 = LOAD 'not_done20/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors20 = UNION prior_done20, prior_not_done20;&lt;br /&gt;projected19 = FOREACH prior_neighbors20 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 19) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped19  = FOREACH (GROUP projected19 BY quadkey) GENERATE group AS quadkey, projected19.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors19 = FOREACH grouped19 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors19 INTO done19 IF COUNT(neighbors) &gt;= 5l, not_done19 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done19 = LOAD 'done19/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done19 = LOAD 'not_done19/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors19 = UNION prior_done19, prior_not_done19;&lt;br /&gt;projected18 = FOREACH prior_neighbors19 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 18) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped18  = FOREACH (GROUP projected18 BY quadkey) GENERATE group AS quadkey, projected18.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors18 = FOREACH grouped18 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors18 INTO done18 IF COUNT(neighbors) &gt;= 5l, not_done18 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done18 = LOAD 'done18/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done18 = LOAD 'not_done18/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors18 = UNION prior_done18, prior_not_done18;&lt;br /&gt;projected17 = FOREACH prior_neighbors18 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 17) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped17  = FOREACH (GROUP projected17 BY quadkey) GENERATE group AS quadkey, projected17.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors17 = FOREACH grouped17 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors17 INTO done17 IF COUNT(neighbors) &gt;= 5l, not_done17 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done17 = LOAD 'done17/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done17 = LOAD 'not_done17/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors17 = UNION prior_done17, prior_not_done17;&lt;br /&gt;projected16 = FOREACH prior_neighbors17 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 16) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped16  = FOREACH (GROUP projected16 BY quadkey) GENERATE group AS quadkey, projected16.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors16 = FOREACH grouped16 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors16 INTO done16 IF COUNT(neighbors) &gt;= 5l, not_done16 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done16 = LOAD 'done16/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done16 = LOAD 'not_done16/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors16 = UNION prior_done16, prior_not_done16;&lt;br /&gt;projected15 = FOREACH prior_neighbors16 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 15) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped15  = FOREACH (GROUP projected15 BY quadkey) GENERATE group AS quadkey, projected15.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors15 = FOREACH grouped15 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors15 INTO done15 IF COUNT(neighbors) &gt;= 5l, not_done15 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done15 = LOAD 'done15/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done15 = LOAD 'not_done15/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors15 = UNION prior_done15, prior_not_done15;&lt;br /&gt;projected14 = FOREACH prior_neighbors15 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 14) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped14  = FOREACH (GROUP projected14 BY quadkey) GENERATE group AS quadkey, projected14.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors14 = FOREACH grouped14 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors14 INTO done14 IF COUNT(neighbors) &gt;= 5l, not_done14 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done14 = LOAD 'done14/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done14 = LOAD 'not_done14/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors14 = UNION prior_done14, prior_not_done14;&lt;br /&gt;projected13 = FOREACH prior_neighbors14 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 13) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped13  = FOREACH (GROUP projected13 BY quadkey) GENERATE group AS quadkey, projected13.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors13 = FOREACH grouped13 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors13 INTO done13 IF COUNT(neighbors) &gt;= 5l, not_done13 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done13 = LOAD 'done13/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done13 = LOAD 'not_done13/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors13 = UNION prior_done13, prior_not_done13;&lt;br /&gt;projected12 = FOREACH prior_neighbors13 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 12) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped12  = FOREACH (GROUP projected12 BY quadkey) GENERATE group AS quadkey, projected12.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors12 = FOREACH grouped12 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors12 INTO done12 IF COUNT(neighbors) &gt;= 5l, not_done12 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done12 = LOAD 'done12/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done12 = LOAD 'not_done12/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors12 = UNION prior_done12, prior_not_done12;&lt;br /&gt;projected11 = FOREACH prior_neighbors12 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 11) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped11  = FOREACH (GROUP projected11 BY quadkey) GENERATE group AS quadkey, projected11.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors11 = FOREACH grouped11 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors11 INTO done11 IF COUNT(neighbors) &gt;= 5l, not_done11 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;prior_done11 = LOAD 'done11/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_not_done11 = LOAD 'not_done11/part*' AS (&lt;br /&gt;     quadkey:   chararray,&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;prior_neighbors11 = UNION prior_done11, prior_not_done11;&lt;br /&gt;projected10 = FOREACH prior_neighbors11 GENERATE&lt;br /&gt;     SUBSTRING(quadkey, 0, 10) AS quadkey,&lt;br /&gt;     geonameid AS geonameid,&lt;br /&gt;     longitude AS longitude,&lt;br /&gt;     latitude  AS latitude,&lt;br /&gt;     neighbors AS neighbors;&lt;br /&gt;grouped10  = FOREACH (GROUP projected10 BY quadkey) GENERATE group AS quadkey, projected10.(geonameid, longitude, latitude, $4) AS points_bag;&lt;br /&gt;nearest_neighbors10 = FOREACH grouped10 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (&lt;br /&gt;     geonameid: int,&lt;br /&gt;     longitude: double,&lt;br /&gt;     latitude:  double,&lt;br /&gt;     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}&lt;br /&gt;   );&lt;br /&gt;SPLIT nearest_neighbors10 INTO done10 IF COUNT(neighbors) &gt;= 5l, not_done10 IF COUNT(neighbors) &lt; 5l;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;So you can see now why, with iteration, it's a good idea to generate as much code as possible.&lt;br /&gt;&lt;br /&gt;And we're done :) &lt;br /&gt;&lt;br /&gt;All the code for this is on github in the &lt;a href="http://github.com/Ganglion/sounder"&gt;Sounder&lt;/a&gt; repo (along with tons of other Apache Pig examples).&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-6738889773115636631?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/6738889773115636631/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/10/nearest-neighbors-with-apache-pig-and.html#comment-form' title='3 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6738889773115636631'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6738889773115636631'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/10/nearest-neighbors-with-apache-pig-and.html' title='Nearest Neighbors With Apache Pig and Jruby'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>3</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-1633264164496403779</id><published>2011-05-03T16:14:00.000-07:00</published><updated>2011-05-03T17:26:07.133-07:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='similarity apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='intersection'/><category scheme='http://www.blogger.com/atom/ns#' term='jaccard similarity'/><category scheme='http://www.blogger.com/atom/ns#' term='jaccard similarity hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='similarity hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='intersection hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='intersection apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='graph'/><category scheme='http://www.blogger.com/atom/ns#' term='jaccard index'/><category scheme='http://www.blogger.com/atom/ns#' term='jaccard similarity pig'/><title type='text'>Structural Similarity With Apache Pig</title><content type='html'>A while ago I posted &lt;a href="http://thedatachef.blogspot.com/2011/02/brute-force-graph-crunching-with-pig.html"&gt;this&lt;/a&gt; about computing the jaccard similarity (otherwise known as the structural similarity) of nodes in a network graph. Looking back on it over the past few days I realize there are some areas for serious improvement. Actually, it's just completely wrong. The approach is broken. So, I'm going to revisit the algorithm again, in more detail, and write a vastly improved Pig script for computing the structural similarity of vertices in a network graph.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;The graph&lt;/h2&gt;&lt;br /&gt;Of course, before you can do a whole lot of anything, you've got to have a graph to work with. To keep things dead simple (and breaking from what I normally do) I'm going to draw an arbitrary graph (and by draw I mean &lt;span style="font-style: italic;"&gt;draw&lt;/span&gt;). Here it is:&lt;br /&gt;&lt;a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="http://3.bp.blogspot.com/-F9rWW42HmBA/TcCPDz2dW3I/AAAAAAAAAMM/9xtOp7i21Ag/s1600/graph.jpg"&gt;&lt;img style="display: block; margin: 0px auto 10px; text-align: center; cursor: pointer; width: 287px; height: 320px;" src="http://3.bp.blogspot.com/-F9rWW42HmBA/TcCPDz2dW3I/AAAAAAAAAMM/9xtOp7i21Ag/s320/graph.jpg" alt="" id="BLOGGER_PHOTO_ID_5602635232069901170" border="0" /&gt;&lt;/a&gt;&lt;br /&gt;This can be represented as a tab-separated-values set of adjacency pairs in a file called 'graph.tsv':&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush:bash"&gt;$: cat graph.tsv&lt;br /&gt;A C&lt;br /&gt;B C&lt;br /&gt;C D&lt;br /&gt;C E&lt;br /&gt;E F&lt;br /&gt;C H&lt;br /&gt;G H&lt;br /&gt;C G&lt;br /&gt;D E&lt;br /&gt;A G&lt;br /&gt;B G&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;I don't think it gets any simpler.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;The Measure&lt;/h2&gt;&lt;br /&gt;Now, the idea here is to compute the similarity between nodes, eg similarity(A,B). There's already a well defined measure for this called the &lt;a href="http://en.wikipedia.org/wiki/Jaccard_index"&gt;jaccard similarity&lt;/a&gt;. The key take away is to notice that:&lt;br /&gt;&lt;br /&gt;&lt;a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="http://2.bp.blogspot.com/-F-UtlR9T3Z0/TcCVUQVydiI/AAAAAAAAAMU/LcrhkM_2_Lg/s1600/jaccard.jpg"&gt;&lt;img style="display: block; margin: 0px auto 10px; text-align: center; cursor: pointer; width: 320px; height: 79px;" src="http://2.bp.blogspot.com/-F-UtlR9T3Z0/TcCVUQVydiI/AAAAAAAAAMU/LcrhkM_2_Lg/s320/jaccard.jpg" alt="" id="BLOGGER_PHOTO_ID_5602642111665174050" border="0" /&gt;&lt;/a&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;The Pig&lt;/h2&gt;&lt;br /&gt;This can be broken into a set of Pig operations quite easily actually:&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;load data&lt;/h3&gt;&lt;br /&gt;Remember, I saved the data into a file called 'graph.tsv'. So:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;edges     = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);&lt;br /&gt;edges_dup = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;It is necessary, but still a hack, to load the data twice at the moment. This is because self-intersections (which I'll be doing in a moment) don't work with Pig 0.8 at the moment.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;augment with sizes&lt;/h3&gt;&lt;br /&gt;Now I need the 'size' of each of the sets. In this case the 'sets' are the list of nodes each node links to. So, all I really have to do is calculate the number of outgoing links:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;grouped_edges = GROUP edges BY v1;&lt;br /&gt;aug_edges     = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;&lt;br /&gt;&lt;br /&gt;grouped_dups  = GROUP edges_dup BY v1;&lt;br /&gt;aug_dups      = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Again, if self-intersections worked, the operation of counting the outgoing links would only have to happen once. Now I've got a handle on the |A| and |B| parts of the equation above.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;intersection&lt;/h3&gt;&lt;br /&gt;Next I'm going to compute the intersection using a Pig join:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;edges_joined  = JOIN aug_edges BY v2, aug_dups BY v2;&lt;br /&gt;intersection  = FOREACH edges_joined {&lt;br /&gt;                  --&lt;br /&gt;                  -- results in:&lt;br /&gt;                  -- (X, Y, |X| + |Y|)&lt;br /&gt;                  -- &lt;br /&gt;                  added_size = aug_edges::v1_out + aug_dups::v1_out;&lt;br /&gt;                  GENERATE&lt;br /&gt;                    aug_edges::v1 AS v1,&lt;br /&gt;                    aug_dups::v1  AS v2,&lt;br /&gt;                    added_size    AS added_size&lt;br /&gt;                  ;&lt;br /&gt;                };&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Notice I'm adding the individual set sizes. This is to come up the |A| + |B| portion of the denominator in the jaccard index.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;intersection sizes&lt;/h3&gt;&lt;br /&gt;In order to compute the size of the intersection I've got to use a Pig GROUP and collect all the elements that matched on the JOIN:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;intersect_grp   = GROUP intersection BY (v1, v2);&lt;br /&gt;intersect_sizes = FOREACH intersect_grp {&lt;br /&gt;                    --&lt;br /&gt;                    -- results in:&lt;br /&gt;                    -- (X, Y, |X /\ Y|, |X| + |Y|)&lt;br /&gt;                    --&lt;br /&gt;                    intersection_size = (double)COUNT(intersection);&lt;br /&gt;                    GENERATE&lt;br /&gt;                      FLATTEN(group)               AS (v1, v2),&lt;br /&gt;                      intersection_size            AS intersection_size,&lt;br /&gt;                      MAX(intersection.added_size) AS added_size -- hack, we only need this one time&lt;br /&gt;                    ;&lt;br /&gt;                  };&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;There's a hack in there. The reason is that 'intersection.added_size' is a Pig data bag containing some number of identical tuples (equal to the intersection size). Each of these tuples is the 'added_size' from the previous step. Using MAX is just a convenient way of pulling out only one of them.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;similarity&lt;/h3&gt;&lt;br /&gt;And finally, I have all the pieces in place to compute the similarities:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;similarities = FOREACH intersect_sizes {&lt;br /&gt;                 --&lt;br /&gt;                 -- results in:&lt;br /&gt;                 -- (X, Y, |X /\ Y|/|X U Y|)&lt;br /&gt;                 --&lt;br /&gt;                 similarity = (double)intersection_size/((double)added_size-(double)intersection_size);&lt;br /&gt;                 GENERATE&lt;br /&gt;                   v1         AS v1,&lt;br /&gt;                   v2         AS v2,&lt;br /&gt;                   similarity AS similarity&lt;br /&gt;                 ;&lt;br /&gt;               };&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;That'll do it. Here's the full script for completeness:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;edges     = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);&lt;br /&gt;edges_dup = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);&lt;br /&gt;&lt;br /&gt;--&lt;br /&gt;-- Augment the edges with the sizes of their outgoing adjacency lists. Note that&lt;br /&gt;-- if a self join was possible we would only have to do this once.&lt;br /&gt;--&lt;br /&gt;grouped_edges = GROUP edges BY v1;&lt;br /&gt;aug_edges     = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;&lt;br /&gt;&lt;br /&gt;grouped_dups  = GROUP edges_dup BY v1;&lt;br /&gt;aug_dups      = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;&lt;br /&gt;&lt;br /&gt;--&lt;br /&gt;-- Compute the sizes of the intersections of outgoing adjacency lists&lt;br /&gt;--&lt;br /&gt;edges_joined  = JOIN aug_edges BY v2, aug_dups BY v2;&lt;br /&gt;intersection  = FOREACH edges_joined {&lt;br /&gt;                  --&lt;br /&gt;                  -- results in:&lt;br /&gt;                  -- (X, Y, |X| + |Y|)&lt;br /&gt;                  -- &lt;br /&gt;                  added_size = aug_edges::v1_out + aug_dups::v1_out;&lt;br /&gt;                  GENERATE&lt;br /&gt;                    aug_edges::v1 AS v1,&lt;br /&gt;                    aug_dups::v1  AS v2,&lt;br /&gt;                    added_size    AS added_size&lt;br /&gt;                  ;&lt;br /&gt;                };&lt;br /&gt;&lt;br /&gt;intersect_grp   = GROUP intersection BY (v1, v2);&lt;br /&gt;intersect_sizes = FOREACH intersect_grp {&lt;br /&gt;                    --&lt;br /&gt;                    -- results in:&lt;br /&gt;                    -- (X, Y, |X /\ Y|, |X| + |Y|)&lt;br /&gt;                    --&lt;br /&gt;                    intersection_size = (double)COUNT(intersection);&lt;br /&gt;                    GENERATE&lt;br /&gt;                      FLATTEN(group)               AS (v1, v2),&lt;br /&gt;                      intersection_size            AS intersection_size,&lt;br /&gt;                      MAX(intersection.added_size) AS added_size -- hack, we only need this one time&lt;br /&gt;                    ;&lt;br /&gt;                  };&lt;br /&gt;&lt;br /&gt;similarities = FOREACH intersect_sizes {&lt;br /&gt;                 --&lt;br /&gt;                 -- results in:&lt;br /&gt;                 -- (X, Y, |X /\ Y|/|X U Y|)&lt;br /&gt;                 --&lt;br /&gt;                 similarity = (double)intersection_size/((double)added_size-(double)intersection_size);&lt;br /&gt;                 GENERATE&lt;br /&gt;                   v1         AS v1,&lt;br /&gt;                   v2         AS v2,&lt;br /&gt;                   similarity AS similarity&lt;br /&gt;                 ;&lt;br /&gt;               };&lt;br /&gt;&lt;br /&gt;DUMP similarities;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Results&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Run it:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: pig -x local jaccard.pig&lt;br /&gt;...snip...&lt;br /&gt;(A,A,1.0)&lt;br /&gt;(A,B,1.0)&lt;br /&gt;(A,C,0.2)&lt;br /&gt;(B,A,1.0)&lt;br /&gt;(B,B,1.0)&lt;br /&gt;(B,C,0.2)&lt;br /&gt;(C,A,0.2)&lt;br /&gt;(C,B,0.2)&lt;br /&gt;(C,C,1.0)&lt;br /&gt;(C,D,0.25)&lt;br /&gt;(C,G,0.25)&lt;br /&gt;(D,C,0.25)&lt;br /&gt;(D,D,1.0)&lt;br /&gt;(E,E,1.0)&lt;br /&gt;(G,C,0.25)&lt;br /&gt;(G,G,1.0)&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;And it's done! Hurray. Now, go make a recommender system or something.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-1633264164496403779?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/1633264164496403779/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/05/structural-similarity-with-apache-pig.html#comment-form' title='3 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/1633264164496403779'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/1633264164496403779'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/05/structural-similarity-with-apache-pig.html' title='Structural Similarity With Apache Pig'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><media:thumbnail xmlns:media='http://search.yahoo.com/mrss/' url='http://3.bp.blogspot.com/-F9rWW42HmBA/TcCPDz2dW3I/AAAAAAAAAMM/9xtOp7i21Ag/s72-c/graph.jpg' height='72' width='72'/><thr:total>3</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-3220777689266815975</id><published>2011-04-26T20:20:00.000-07:00</published><updated>2011-04-26T20:41:52.294-07:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='lucene tokenization pig'/><category scheme='http://www.blogger.com/atom/ns#' term='apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='apache hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='lucene'/><category scheme='http://www.blogger.com/atom/ns#' term='tokenize udf'/><category scheme='http://www.blogger.com/atom/ns#' term='raw text'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='tokenize text with pig'/><category scheme='http://www.blogger.com/atom/ns#' term='text processing with hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='apache lucene'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>A Lucene Text Tokenization UDF for Apache Pig</title><content type='html'>As much as I loathe to admit it, sometimes java is called for. One of those times is tokenizing raw text. You'll notice in the post about tfidf how I used a Wukong script, written in ruby, to accomplish the task of tokenizing a large text corpus with Hadoop and Pig. There are a couple of problems with this:&lt;br /&gt;&lt;br /&gt;1. Ruby is &lt;span style="font-style:italic;"&gt;slow&lt;/span&gt; at this.&lt;br /&gt;&lt;br /&gt;2. All the gem dependencies (wukong itself, extlib, etc) must exist on all the machines in the cluster and be available in the RUBYLIB (yet another environment variable to manage).&lt;br /&gt;&lt;br /&gt;There is a better way.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt; A Pig UDF&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Pig UDFs (User Defined Functions) come in a variety of flavors. The simplest type is the EvalFunc whose function 'exec()' essentially acts as the Wukong 'process()' method or the java hadoop Mapper's 'map()' function. Here we're going to write an EvalFunc that takes a raw text string as input and outputs a pig DataBag. Each Tuple in the DataBag will be a single token. Here's what it looks like as a whole:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: java"&gt;&lt;br /&gt;&lt;br /&gt;import java.io.IOException;&lt;br /&gt;import java.io.StringReader;&lt;br /&gt;import java.util.Iterator;&lt;br /&gt;&lt;br /&gt;import org.apache.pig.EvalFunc;&lt;br /&gt;import org.apache.pig.data.Tuple;&lt;br /&gt;import org.apache.pig.data.TupleFactory;&lt;br /&gt;import org.apache.pig.data.DataBag;&lt;br /&gt;import org.apache.pig.data.BagFactory;&lt;br /&gt;&lt;br /&gt;import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;&lt;br /&gt;import org.apache.lucene.util.Version;&lt;br /&gt;import org.apache.lucene.analysis.Token;&lt;br /&gt;import org.apache.lucene.analysis.TokenStream;&lt;br /&gt;import org.apache.lucene.analysis.standard.StandardAnalyzer;&lt;br /&gt;import org.apache.lucene.analysis.standard.StandardTokenizer;&lt;br /&gt;&lt;br /&gt;public class TokenizeText extends EvalFunc&lt;DataBag&gt; {&lt;br /&gt;&lt;br /&gt;    private static TupleFactory tupleFactory = TupleFactory.getInstance();&lt;br /&gt;    private static BagFactory bagFactory = BagFactory.getInstance();&lt;br /&gt;    private static String NOFIELD = "";&lt;br /&gt;    private static StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_31);&lt;br /&gt;&lt;br /&gt;    public DataBag exec(Tuple input) throws IOException {&lt;br /&gt;        if (input == null || input.size() &lt; 1 || input.isNull(0))&lt;br /&gt;            return null;&lt;br /&gt;&lt;br /&gt;        // Output bag&lt;br /&gt;        DataBag bagOfTokens = bagFactory.newDefaultBag();&lt;br /&gt;                &lt;br /&gt;        StringReader textInput = new StringReader(input.get(0).toString());&lt;br /&gt;        TokenStream stream = analyzer.tokenStream(NOFIELD, textInput);&lt;br /&gt;        CharTermAttribute termAttribute = stream.getAttribute(CharTermAttribute.class);&lt;br /&gt;&lt;br /&gt;        while (stream.incrementToken()) {&lt;br /&gt;            Tuple termText = tupleFactory.newTuple(termAttribute.toString());&lt;br /&gt;            bagOfTokens.add(termText);&lt;br /&gt;            termAttribute.setEmpty();&lt;br /&gt;        }&lt;br /&gt;        return bagOfTokens;&lt;br /&gt;    }&lt;br /&gt;}&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;There's absolutely nothing special going on here. Remember, the 'exec' function gets called on every Pig Tuple of input. bagOfTokens will be the Pig DataBag returned. First, the lucene library tokenizes the input string. Then all the tokens in the resulting stream are turned into Pig Tuples and added to the result DataBag. Finally the resulting DataBag is returned. A document is truly a bag of words.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Example Pig Script&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;And here's an example script to use that UDF:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;documents    = LOAD 'documents' AS (doc_id:chararray, text:chararray);&lt;br /&gt;tokenized    = FOREACH documents GENERATE doc_id AS doc_id, FLATTEN(TokenizeText(text)) AS (token:chararray);&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;And that's it. It's blazing fast text tokenization for Apache Pig.&lt;br /&gt;&lt;br /&gt;Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-3220777689266815975?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/3220777689266815975/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/04/lucene-text-tokenization-udf-for-apache.html#comment-form' title='1 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/3220777689266815975'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/3220777689266815975'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/04/lucene-text-tokenization-udf-for-apache.html' title='A Lucene Text Tokenization UDF for Apache Pig'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>1</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-3882358193128098394</id><published>2011-04-23T21:44:00.000-07:00</published><updated>2011-04-23T22:54:41.749-07:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='tfidf with hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='text processing'/><category scheme='http://www.blogger.com/atom/ns#' term='tfidf'/><category scheme='http://www.blogger.com/atom/ns#' term='text processing with pig'/><category scheme='http://www.blogger.com/atom/ns#' term='process corpora with hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='tfidf with pig'/><category scheme='http://www.blogger.com/atom/ns#' term='text processing with hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>TF-IDF With Apache Pig</title><content type='html'>Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).&lt;br /&gt;&lt;br /&gt;Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.&lt;br /&gt;&lt;br /&gt;One of the cool things about tf-idf is how damn &lt;span style="font-style:italic;"&gt;simple&lt;/span&gt; it is. Take a look at the wikipedia &lt;a href="http://en.wikipedia.org/wiki/Tf%E2%80%93idf"&gt;page&lt;/a&gt;. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (&gt;= 0.8) installed, and the Wukong rubygem.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Get Data&lt;/h2&gt;&lt;br /&gt;What would an example be without some data? Here's a link to the canonical &lt;a href="http://www.infochimps.com/datasets/18828-articles-from-20-news-groups-de-duped-version"&gt;20 newsgroups data set&lt;/a&gt; that I've restructured slightly for your analysis pleasure.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Tokenization&lt;/h2&gt;&lt;br /&gt;Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find &lt;a href="http://gist.github.com/939328"&gt;here&lt;/a&gt;. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Pig, Step By Step&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;This statement can be interpreted to mean:&lt;br /&gt;&lt;br /&gt;"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."&lt;br /&gt;&lt;br /&gt;Next up is to load and tokenize the data:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);&lt;br /&gt;tokenized     = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs.&lt;br /&gt;&lt;br /&gt;Then, we need to count the number of times each unique (doc_id, token) pair appears:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;doc_tokens       = GROUP tokenized BY (doc_id, token);&lt;br /&gt;doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;And, since we're after the token frequencies and not just the counts, we need to attach the document sizes:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;doc_usage_bag    = GROUP doc_token_counts BY doc_id;&lt;br /&gt;doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE&lt;br /&gt;                     group                                                 AS doc_id,&lt;br /&gt;                     FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages), &lt;br /&gt;                     SUM(doc_token_counts.num_doc_tok_usages)              AS doc_size&lt;br /&gt;                   ;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Then the term frequencies are just:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;term_freqs = FOREACH doc_usage_bag_fg GENERATE&lt;br /&gt;               doc_id                                          AS doc_id,&lt;br /&gt;               token                                           AS token,&lt;br /&gt;               ((double)num_doc_tok_usages / (double)doc_size) AS term_freq;&lt;br /&gt;             ;&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;term_usage_bag  = GROUP term_freqs BY token;&lt;br /&gt;token_usages    = FOREACH term_usage_bag GENERATE&lt;br /&gt;                    FLATTEN(term_freqs) AS (doc_id, token, term_freq),&lt;br /&gt;                    COUNT(term_freqs)   AS num_docs_with_token&lt;br /&gt;                   ;&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Finally, we can compute the tf-idf weight:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;tfidf_all = FOREACH token_usages {&lt;br /&gt;              idf    = LOG((double)$NDOCS/(double)num_docs_with_token);&lt;br /&gt;              tf_idf = (double)term_freq*idf;&lt;br /&gt;                GENERATE&lt;br /&gt;                  doc_id AS doc_id,&lt;br /&gt;                  token  AS token,&lt;br /&gt;                  tf_idf AS tf_idf&lt;br /&gt;                ;&lt;br /&gt;             };&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Where NDOCS is the total number of documents in the corpus.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Pig, All Together Now&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;And here's the final script, all put together:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');&lt;br /&gt;&lt;br /&gt;raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);&lt;br /&gt;tokenized     = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);&lt;br /&gt;&lt;br /&gt;doc_tokens       = GROUP tokenized BY (doc_id, token);&lt;br /&gt;doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;&lt;br /&gt;&lt;br /&gt;doc_usage_bag    = GROUP doc_token_counts BY doc_id;&lt;br /&gt;doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE&lt;br /&gt;                     group                                                 AS doc_id,&lt;br /&gt;                     FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages), &lt;br /&gt;                     SUM(doc_token_counts.num_doc_tok_usages)              AS doc_size&lt;br /&gt;                   ;&lt;br /&gt;&lt;br /&gt;term_freqs = FOREACH doc_usage_bag_fg GENERATE&lt;br /&gt;               doc_id                                          AS doc_id,&lt;br /&gt;               token                                           AS token,&lt;br /&gt;               ((double)num_doc_tok_usages / (double)doc_size) AS term_freq;&lt;br /&gt;             ;&lt;br /&gt;             &lt;br /&gt;term_usage_bag  = GROUP term_freqs BY token;&lt;br /&gt;token_usages    = FOREACH term_usage_bag GENERATE&lt;br /&gt;                    FLATTEN(term_freqs) AS (doc_id, token, term_freq),&lt;br /&gt;                    COUNT(term_freqs)   AS num_docs_with_token&lt;br /&gt;                   ;&lt;br /&gt;&lt;br /&gt;tfidf_all = FOREACH token_usages {&lt;br /&gt;              idf    = LOG((double)$NDOCS/(double)num_docs_with_token);&lt;br /&gt;              tf_idf = (double)term_freq*idf;&lt;br /&gt;                GENERATE&lt;br /&gt;                  doc_id AS doc_id,&lt;br /&gt;                  token  AS token,&lt;br /&gt;                  tf_idf AS tf_idf&lt;br /&gt;                ;&lt;br /&gt;             };&lt;br /&gt;&lt;br /&gt;STORE tfidf_all INTO '$OUT';&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Run It&lt;/h2&gt;&lt;br /&gt;Assuming your data is on the hdfs at "/data/corpus/20_newsgroups/data", and you've named the pig script "tfidf.pig", then you can run with the following:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;pig -p DOCS=/data/corpus/20_newsgroups/data -p NDOCS=18828 -p OUT=/data/corpus/20_newsgroups/tfidf tfidf.pig&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Here's what the output of that looks like:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;sci.crypt.15405                student  0.0187808247467920464&lt;br /&gt;sci.crypt.16005                student  0.0541184292045718135&lt;br /&gt;sci.med.58809                  student  0.0839387881540297476&lt;br /&gt;sci.med.59021                  student  0.0027903667703849783&lt;br /&gt;sci.space.60850                student  0.0377339506380500803&lt;br /&gt;sci.crypt.15178                student  0.0010815147566519744&lt;br /&gt;soc.religion.christian.21414   student  0.0587571517078208302&lt;br /&gt;comp.sys.ibm.pc.hardware.60725 student  0.0685500103257909721&lt;br /&gt;soc.religion.christian.21485   student  0.0232372916358613464&lt;br /&gt;soc.religion.christian.21556   student  0.0790961657605280533&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Hurray.&lt;br /&gt;&lt;br /&gt;Next time let's look at how to take that output and recover the topics using clustering!&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-3882358193128098394?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/3882358193128098394/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/04/tf-idf-with-apache-pig.html#comment-form' title='1 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/3882358193128098394'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/3882358193128098394'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/04/tf-idf-with-apache-pig.html' title='TF-IDF With Apache Pig'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>1</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-6670187988897202038</id><published>2011-02-15T20:44:00.000-08:00</published><updated>2011-02-15T21:23:39.691-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='indexing text with pig'/><category scheme='http://www.blogger.com/atom/ns#' term='apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='indexing text with hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='raw text'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='pig store elasticsearch'/><category scheme='http://www.blogger.com/atom/ns#' term='elasticsearch'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Indexing Text Corpora With Pig and ElasticSearch</title><content type='html'>Routinely working with raw data is strenuous, ulcer inducing, and overall hazardous to your health. Unstructured text even more so. Safe data mining requires proper mining gear. Imagine if you could step into a power-suit, fire up your hydraulic force pincers, and make that data your bitch?  &lt;br /&gt;Turns out, a specialty of mine is in building useful and interesting exoskeletons (think of Sigourney Weaver in Aliens) for developers of all shapes and sizes.&lt;br /&gt;&lt;br /&gt;On that note I've written a pig STORE function for elasticsearch. Now you can use simple Pig syntax to transform arbitrary input data and index the output records with elasticsearch. Here's an example:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;%default INDEX 'ufo_sightings'&lt;br /&gt;%default OBJ   'ufo_sighting'        &lt;br /&gt;&lt;br /&gt;ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);&lt;br /&gt;STORE ufo_sightings INTO 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchIndex('-1', '1000');&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Where '-1' means the records have no inherent id and '1000' is the number of records to batch up before indexing. Here's the link to the github page (&lt;a href="http://github.com/infochimps/wonderdog"&gt;wonderdog&lt;/a&gt;).&lt;br /&gt;&lt;br /&gt;It doesn't get any simpler. You've just been endowed with magic super text indexing powers. Now go. Index some raw text.&lt;br /&gt;&lt;br /&gt;Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-6670187988897202038?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/6670187988897202038/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/02/indexing-text-corpora-with-pig-and.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6670187988897202038'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6670187988897202038'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/02/indexing-text-corpora-with-pig-and.html' title='Indexing Text Corpora With Pig and ElasticSearch'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-4603116858590414496</id><published>2011-02-04T07:37:00.000-08:00</published><updated>2011-02-04T19:31:12.386-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='pig stream'/><category scheme='http://www.blogger.com/atom/ns#' term='apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='similarity apache pig'/><category scheme='http://www.blogger.com/atom/ns#' term='network'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop example'/><category scheme='http://www.blogger.com/atom/ns#' term='graph'/><category scheme='http://www.blogger.com/atom/ns#' term='examples'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong example'/><category scheme='http://www.blogger.com/atom/ns#' term='item-item similarity'/><category scheme='http://www.blogger.com/atom/ns#' term='apache'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Brute Force Graph Crunching With Pig and Wukong</title><content type='html'>Just discovered this amazing data set matching all Marvel Universe comic book characters with the comic books they've appeared in (&lt;a href="http://bioinfo.uib.es/~joemiro/marvel.html"&gt;Social Characteristics of the Marvel Universe&lt;/a&gt;). I've made the data set available on Infochimps &lt;a href="http://infochimps.com/datasets/marvel-universe-social-graph"&gt;here&lt;/a&gt; in a sane and easy to use format.&lt;br /&gt;&lt;br /&gt;Here's what that looks like:&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: head labeled_edges.tsv | wu-lign &lt;br /&gt;"FROST, CARMILLA"      "AA2 35"  &lt;br /&gt;"KILLRAVEN/JONATHAN R" "AA2 35"  &lt;br /&gt;"M'SHULLA"             "AA2 35"  &lt;br /&gt;"24-HOUR MAN/EMMANUEL" "AA2 35"  &lt;br /&gt;"OLD SKULL"            "AA2 35"  &lt;br /&gt;"G'RATH"               "AA2 35"  &lt;br /&gt;"3-D MAN/CHARLES CHAN" "M/PRM 35"&lt;br /&gt;"3-D MAN/CHARLES CHAN" "M/PRM 36"&lt;br /&gt;"3-D MAN/CHARLES CHAN" "M/PRM 37"&lt;br /&gt;"HUMAN ROBOT"          "WI? 9"&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Simple Question&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;A natural question to ask of such an awesome graph is for the similarity between two characters based on what comic books they've appeared in together. This is called the &lt;span style="font-style:italic;"&gt;structural similarity&lt;/span&gt; since we're only using the structure of the graph and no other meta data (weights, etc). Note that this could also be applied the other direction to find the similarity between two comic books based on what characters they share.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Wee bit of math&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;The structural similarity is nothing more than the &lt;span style="font-style:italic;"&gt;jaccard similarity&lt;/span&gt; applied to nodes in a network graph. Here's the definition of that from wikipedia:&lt;br /&gt;&lt;br /&gt;&lt;a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="http://upload.wikimedia.org/math/1/8/6/186c7f4e83da32e889d606140fae25a0.png"&gt;&lt;img style="display:block; margin:0px auto 10px; text-align:center;cursor:pointer; cursor:hand;width: 164px; height: 47px;" src="http://upload.wikimedia.org/math/1/8/6/186c7f4e83da32e889d606140fae25a0.png" border="0" alt="" /&gt;&lt;/a&gt;&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;So basically all we've got to do is get a list of all the comic books that two characters, say character A and character B, have appeared in. These lists of comic books form two mathematical sets.&lt;br /&gt;&lt;br /&gt;The numerator in that simple formula says to compute the intersection of A and B and then count how many elements are left. More plainly, that's just the number of comic books the two characters have in common.&lt;br /&gt;&lt;br /&gt;The denominator tells us to compute the union of A and B and count how many elements are in the resulting set. That's just the number of unique comic books that A and B have ever been in, either at the same time or not.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Pig&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Here how we're going to say it using pig:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;DEFINE jaccard_similarity `ruby jaccard_similarity.rb --map` SHIP('jaccard_similarity.rb');&lt;br /&gt;edges          = LOAD '/data/comics/marvel/labeled_edges.tsv' AS (character:chararray, comic:chararray);&lt;br /&gt;grouped        = GROUP edges BY character;&lt;br /&gt;with_sets      = FOREACH grouped GENERATE group AS character, FLATTEN(edges.comic) AS comic, edges.comic AS set;&lt;br /&gt;SPLIT with_sets INTO with_sets_dup IF ( 1 &gt; 0 ), not_used if (1 &lt; 0); -- hack hack hack, self join still doesn't work&lt;br /&gt;joined         = JOIN with_sets BY comic, with_sets_dup BY comic;&lt;br /&gt;pairs          = FOREACH joined GENERATE&lt;br /&gt;                   with_sets::character     AS character_a,&lt;br /&gt;                   with_sets::set           AS character_a_set,&lt;br /&gt;                   with_sets_dup::character AS character_b,&lt;br /&gt;                   with_sets_dup::set       AS character_b_set&lt;br /&gt;                 ;&lt;br /&gt;similarity     = STREAM pairs THROUGH jaccard_similarity AS (character_a:chararray, character_b:chararray, similarity:float);&lt;br /&gt;STORE similarity INTO '/data/comics/marvel/character_similarity.tsv';&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Notice we're doing a bit of funny business here. Writing the actual algorithm for the jaccard similarity between two small sets doesn't make much sense in Pig. Instead we've written a wukong script to do it for us (you could also write a Pig udf if you're a masochist).&lt;br /&gt;&lt;br /&gt;The first thing we do here is use the &lt;span style="font-style:italic;"&gt;DEFINE&lt;/span&gt; operator to tell pig that there's an external command we want to call, the alias for it, how to call it, and to &lt;span style="font-style:italic;"&gt;SHIP&lt;/span&gt; the script we need to all nodes in the cluster.&lt;br /&gt;&lt;br /&gt;Next we use the &lt;span style="font-style:italic;"&gt;GROUP&lt;/span&gt; operator and then the &lt;span style="font-style:italic;"&gt;FOREACH..GENERATE&lt;/span&gt; projection operator to get, for every character, a the list of comic books they've appeared in. &lt;br /&gt;&lt;br /&gt;We also use the &lt;span style="font-style:italic;"&gt;FLATTEN&lt;/span&gt; operator during the projection as well. The reason is so that we can use the &lt;span style="font-style:italic;"&gt;JOIN&lt;/span&gt; operator to pull out (character,character) pairs that have at least one comic book in common. (Don't get scared about the gross looking &lt;span style="font-style:italic;"&gt;SPLIT&lt;/span&gt; operator in there. Just ignore it. It's a hack to get around the fact that self-joins still don't quite work properly in pig. Pretend we're just joining 'with_sets' with itself.)&lt;br /&gt;&lt;br /&gt;The last step is to &lt;span style="font-style:italic;"&gt;STREAM&lt;/span&gt; our pairs through the simple wukong script. Here's what that looks like:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;require 'wukong/and_pig' # for special conversion methods&lt;br /&gt;require 'set'&lt;br /&gt;&lt;br /&gt;#&lt;br /&gt;# Takes two pig bags and computes their jaccard similarity&lt;br /&gt;#&lt;br /&gt;# eg.&lt;br /&gt;#&lt;br /&gt;# input:&lt;br /&gt;#&lt;br /&gt;# (a,{(1),(2),(3)}, b, {(2),(9),(5)})&lt;br /&gt;#&lt;br /&gt;# output:&lt;br /&gt;#&lt;br /&gt;# (a, b, 0.2)&lt;br /&gt;#&lt;br /&gt;class JaccardSim &lt; Wukong::Streamer::RecordStreamer&lt;br /&gt;  def process node_a, set_a, node_b, set_b&lt;br /&gt;    yield [node_a, node_b, jaccard(set_a, set_b)]&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def jaccard bag_a, bag_b&lt;br /&gt;    common_elements = ((bag_a.from_pig_bag.to_set).intersection(bag_b.from_pig_bag.to_set)).size&lt;br /&gt;    total_elements  = ((bag_a.from_pig_bag.to_set).union(bag_b.from_pig_bag.to_set)).size&lt;br /&gt;    common_elements.to_f / total_elements.to_f&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;Wukong::Script.new(JaccardSim, nil).run&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Notice the nifty 'from_pig_bag' method that wukong has. All we're doing here is converting the two pig bags into ruby 'set' objects then doing the simple jaccard similarity calculation. (3 lines of code for the calculation itself, still want to do it in java?)&lt;br /&gt;&lt;br /&gt;And that's it. Here's what it looks like after running:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;"HUMAN TORCH/JOHNNY S" "GALACTUS/GALAN"        0.0514112900&lt;br /&gt;"HUMAN TORCH/JOHNNY S" "ROGUE /"               0.0371308030&lt;br /&gt;"HUMAN TORCH/JOHNNY S" "UATU"                  0.0481557360&lt;br /&gt;"LIVING LIGHTNING/MIG" "USAGENT DOPPELGANGER"  0.0322580640&lt;br /&gt;"LIVING LIGHTNING/MIG" "STRONG GUY/GUIDO CAR"  0.1052631600&lt;br /&gt;"LIVING LIGHTNING/MIG" "STORM/ORORO MUNROE S"  0.0209059230&lt;br /&gt;"LIVING LIGHTNING/MIG" "USAGENT/CAPTAIN JOHN"  0.1941747500&lt;br /&gt;"LIVING LIGHTNING/MIG" "WASP/JANET VAN DYNE "  0.0398089180&lt;br /&gt;"LIVING LIGHTNING/MIG" "THING/BENJAMIN J. GR"  0.0125120310&lt;br /&gt;"LIVING LIGHTNING/MIG" "WOLFSBANE/RAHNE SINC"  0.0209059230&lt;br /&gt;"LIVING LIGHTNING/MIG" "THUNDERSTRIKE/ERIC K"  0.1153846160&lt;br /&gt;"LIVING LIGHTNING/MIG" "WILD CHILD/KYLE GIBN"  0.0434782600&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Sweet.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-4603116858590414496?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/4603116858590414496/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/02/brute-force-graph-crunching-with-pig.html#comment-form' title='1 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/4603116858590414496'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/4603116858590414496'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/02/brute-force-graph-crunching-with-pig.html' title='Brute Force Graph Crunching With Pig and Wukong'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>1</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-6429706113065765771</id><published>2011-01-26T21:03:00.000-08:00</published><updated>2011-01-26T21:49:51.011-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='analysis'/><category scheme='http://www.blogger.com/atom/ns#' term='degree distribution'/><category scheme='http://www.blogger.com/atom/ns#' term='degree distribution with hadoop'/><category scheme='http://www.blogger.com/atom/ns#' term='degree distribution with pig'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Graph Processing With Apache Pig</title><content type='html'>So, you're probably sick of seeing this airport data set by now (&lt;a href="http://infochimps.com/datasets/d35-million-us-domestic-flights-from-1990-to-2009-edges-only"&gt;flight edges&lt;/a&gt;) but it's so awesome that I have to re-use it. Let's use Pig to do the same calculation as &lt;a href="http://thedatachef.blogspot.com/2011/01/graph-processing-with-wukong-and-hadoop.html"&gt;this&lt;/a&gt; post in a much more succinct way. We'll really get a feel for what Pig is better at than Wukong.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Degree Distribution&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Since the point here is to illustrate the difference between the wukong way and the pig way we're not going to introduce anything clever here. Here's the code for calculating the degree by month (both passengers and flights) for every domestic airport since 1990:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;--&lt;br /&gt;-- Caculates the monthly degree distributions for domestic airports from 1990 to 2009.&lt;br /&gt;--&lt;br /&gt;-- Load data (boring part)&lt;br /&gt;flight_edges = LOAD '$FLIGHT_EDGES' AS (origin_code:chararray, destin_code:chararray, passengers:int, flights:int, month:int);&lt;br /&gt;&lt;br /&gt;-- For every (airport,month) pair get passengers, seats, and flights out&lt;br /&gt;edges_out     = FOREACH flight_edges GENERATE&lt;br /&gt;                  origin_code AS airport,&lt;br /&gt;                  month       AS month,&lt;br /&gt;                  passengers  AS passengers_out,&lt;br /&gt;                  flights     AS flights_out&lt;br /&gt;                ;&lt;br /&gt;&lt;br /&gt;-- For every (airport,month) pair get passengers, seats, and flights in&lt;br /&gt;edges_in      = FOREACH flight_edges GENERATE&lt;br /&gt;                  destin_code AS airport,&lt;br /&gt;                  month       AS month,&lt;br /&gt;                  passengers  AS passengers_in,&lt;br /&gt;                  flights     AS flights_in&lt;br /&gt;                ;&lt;br /&gt;&lt;br /&gt;-- group them together and sum&lt;br /&gt;grouped_edges = COGROUP edges_in BY (airport,month), edges_out BY (airport,month);&lt;br /&gt;degree_dist   = FOREACH grouped_edges {&lt;br /&gt;                  passenger_degree = SUM(edges_in.passengers_in) + SUM(edges_out.passengers_out);&lt;br /&gt;                  flights_degree   = SUM(edges_in.flights_in)    + SUM(edges_out.flights_out);&lt;br /&gt;                  GENERATE&lt;br /&gt;                    FLATTEN(group)   AS (airport, month),&lt;br /&gt;                    passenger_degree AS passenger_degree,&lt;br /&gt;                    flights_degree   AS flights_degree&lt;br /&gt;                  ;&lt;br /&gt;                };&lt;br /&gt;&lt;br /&gt;STORE degree_dist INTO '$DEG_DIST';&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;So, here's what's going on:&lt;br /&gt;&lt;br /&gt;&lt;ul&gt;&lt;br /&gt;&lt;li&gt;FOREACH..GENERATE: this is called a 'projection' in pig. Here we're really just cutting out the fields we don't want and rearranging our records. This is exactly the same as what we do in the wukong script, where we yielded two different types of records for the same input data in the map phase, only a lot more clear.&lt;/li&gt;&lt;br /&gt;&lt;li&gt;COGROUP: here we're simply joining our two data relations together (edges in and edges out) by a common key (airport code,month) and aggregating the values for that key. This is exactly the same as what we do in the 'accumulate' part of the wukong script.&lt;/li&gt;&lt;br /&gt;&lt;li&gt;FOREACH..GENERATE (once more): here we run through our grouped records and sum the flights and passengers. This is exactly the same as the 'finalize' part of the wukong script.&lt;/li&gt;&lt;br /&gt;&lt;/ul&gt;&lt;br /&gt;&lt;br /&gt;So, basically, we've done in 4 lines (not counting the LOAD and STORE or the prettification) of very clear and concise code what took us ~70 lines of ruby. Win.&lt;br /&gt;&lt;br /&gt;Here's the wukong one again for reference:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;&lt;br /&gt;class EdgeMapper &lt; Wukong::Streamer::RecordStreamer&lt;br /&gt;  #&lt;br /&gt;  # Yield both ways so we can sum (passengers in + passengers out) and (flights&lt;br /&gt;  # in + flights out) individually in the reduce phase.&lt;br /&gt;  #&lt;br /&gt;  def process origin_code, destin_code, passengers, flights, month&lt;br /&gt;    yield [origin_code, month, "OUT", passengers, flights]&lt;br /&gt;    yield [destin_code, month, "IN", passengers, flights]&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;class DegreeCalculator &lt; Wukong::Streamer::AccumulatingReducer&lt;br /&gt;  #&lt;br /&gt;  # What are we going to use as a key internally?&lt;br /&gt;  #&lt;br /&gt;  def get_key airport, month, in_or_out, passengers, flights&lt;br /&gt;    [airport, month]&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def start! airport, month, in_or_out, passengers, flights&lt;br /&gt;    @out_degree = {:passengers =&gt; 0, :flights =&gt; 0}&lt;br /&gt;    @in_degree  = {:passengers =&gt; 0, :flights =&gt; 0}&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def accumulate airport, month, in_or_out, passengers, flights&lt;br /&gt;    case in_or_out&lt;br /&gt;    when "IN" then&lt;br /&gt;      @in_degree[:passengers] += passengers.to_i&lt;br /&gt;      @in_degree[:flights]    += flights.to_i&lt;br /&gt;    when "OUT" then&lt;br /&gt;      @out_degree[:passengers] += passengers.to_i&lt;br /&gt;      @out_degree[:flights]    += flights.to_i&lt;br /&gt;    end&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # For every airport and month, calculate passenger and flight degrees&lt;br /&gt;  #&lt;br /&gt;  def finalize&lt;br /&gt;&lt;br /&gt;    # Passenger degrees (out, in, and total)&lt;br /&gt;    passengers_out   = @out_degree[:passengers]&lt;br /&gt;    passengers_in    = @in_degree[:passengers]&lt;br /&gt;    passengers_total = passengers_in + passengers_out&lt;br /&gt;&lt;br /&gt;    # Flight degrees (out, in, and total)&lt;br /&gt;    flights_out      = @out_degree[:flights]&lt;br /&gt;    flights_in       = @in_degree[:flights]&lt;br /&gt;    flights_total    = flights_in + flights_out&lt;br /&gt;&lt;br /&gt;    yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;#&lt;br /&gt;# Need to use 2 fields for partition so every record with the same airport and&lt;br /&gt;# month land on the same reducer&lt;br /&gt;#&lt;br /&gt;Wukong::Script.new(&lt;br /&gt;  EdgeMapper,&lt;br /&gt;  DegreeCalculator,&lt;br /&gt;  :partition_fields  =&gt; 2 # use two fields to partition records&lt;br /&gt;  ).run&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Plot Data&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;A very common workflow pattern with Hadoop is to use a tool like pig or wukong to process large scale data and generate some result data set. The last step in this process is (rather obviously) to summarize that data in some way. Here's a quick plot (using R and ggplot2) of the flights degree distribution after I further summarized by year:&lt;br /&gt;&lt;br /&gt;&lt;a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="http://3.bp.blogspot.com/_rbU0qmUQfs8/TUEFkWpqkMI/AAAAAAAAALo/769RtGBSDQU/s1600/passenger_degrees_select_years.png"&gt;&lt;img style="display:block; margin:0px auto 10px; text-align:center;cursor:pointer; cursor:hand;width: 320px; height: 160px;" src="http://3.bp.blogspot.com/_rbU0qmUQfs8/TUEFkWpqkMI/AAAAAAAAALo/769RtGBSDQU/s320/passenger_degrees_select_years.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5566736736520409282" /&gt;&lt;/a&gt;&lt;br /&gt;&lt;br /&gt;That's funny...&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;Finally, here's the code for the plot:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;# include the ggplot2 library for nice plots&lt;br /&gt;library(ggplot2);&lt;br /&gt;&lt;br /&gt;# Read data in and take a subset&lt;br /&gt;degrees        &lt;- read.table('yearly_degrees.tsv', header=FALSE, sep='\t', colClasses=c('character', 'character', 'numeric', 'numeric'));&lt;br /&gt;names(degrees) &lt;- c('airport_code', 'year', 'passenger_degree', 'flights_degree');&lt;br /&gt;select_degrees &lt;- subset(degrees, year=='2000' | year=='2001' | year=='2002' | year=='2009' | year=='1990');&lt;br /&gt;&lt;br /&gt;# Plotting with ggplot2&lt;br /&gt;pdf('passenger_degrees.pdf', 12, 6, pointsize=10);&lt;br /&gt;ggplot(select_degrees, aes(x=passenger_degree, fill=year)) + geom_density(colour='black', alpha=0.3) + scale_x_log10() + ylab('Probability') + xlab(expression(log[10] ('Passengers in + Passengers out'))) + opts(title='Passenger Degree Distribution')&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-6429706113065765771?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/6429706113065765771/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/graph-processing-with-apache-pig.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6429706113065765771'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6429706113065765771'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/graph-processing-with-apache-pig.html' title='Graph Processing With Apache Pig'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><media:thumbnail xmlns:media='http://search.yahoo.com/mrss/' url='http://3.bp.blogspot.com/_rbU0qmUQfs8/TUEFkWpqkMI/AAAAAAAAALo/769RtGBSDQU/s72-c/passenger_degrees_select_years.png' height='72' width='72'/><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-7801728563983802324</id><published>2011-01-23T14:10:00.000-08:00</published><updated>2011-01-23T14:23:26.854-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='wonderdog'/><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='java'/><category scheme='http://www.blogger.com/atom/ns#' term='indexing'/><category scheme='http://www.blogger.com/atom/ns#' term='search'/><category scheme='http://www.blogger.com/atom/ns#' term='bulk'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Bulk Indexing With ElasticSearch and Hadoop</title><content type='html'>At Infochimps we recently indexed over 2.5 billion documents for a total of 4TB total indexed size. This would not have been possible without ElasticSearch and the Hadoop bulk loader we wrote, &lt;a href="http://github.com/infochimps/wonderdog"&gt;wonderdog&lt;/a&gt;. I'll go into the technical details in a later post but for now here's how you can get started with ElasticSearch and Hadoop:&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Getting Started with ElasticSearch&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;The first thing is to actually install elasticsearch:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: wget http://github.com/downloads/elasticsearch/elasticsearch/elasticsearch-0.14.2.zip&lt;br /&gt;$: sudo mv elasticsearch-0.14.2 /usr/local/share/&lt;br /&gt;$: sudo ln -s /usr/local/share/elasticsearch-0.14.2 /usr/local/share/elasticsearch&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Next you'll want to make sure there is an 'elasticsearch' user and that there are suitable data, work, and log directories that 'elasticsearch' owns:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: sudo useradd elasticsearch&lt;br /&gt;$: sudo mkdir -p /var/log/elasticsearch /var/run/elasticsearch/{data,work}&lt;br /&gt;$: sudo chown -R elasticsearch /var/{log,run}/elasticsearch&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Then get wonderdog (you'll have to git clone it for now) and go ahead and copy the example configuration in wonderdog/config:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: sudo mkdir -p /etc/elasticsearch&lt;br /&gt;$: sudo cp config/elasticsearch-example.yml /etc/elasticsearch/elasticsearch.yml&lt;br /&gt;$: sudo cp config/logging.yml /etc/elasticsearch/&lt;br /&gt;$: sudo cp config/elasticsearch.in.sh /etc/elasticsearch/&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Make changes to 'elasticsearch.yml' such that it points to the correct data, work, and log directories. Also, you'll want to change the number of 'recovery_after_nodes' and 'expected_nodes' in elasticsearch.yml to however many nodes (machines) you actually expect to have in your cluster. You'll probably also want to do a quick once-over of elasticsearch.in.sh and make sure the jvm settings, etc are sane for your particular setup. Finally, to startup do:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;sudo -u elasticsearch /usr/local/share/elasticsearch/bin/elasticsearch -Des.config=/etc/elasticsearch/elasticsearch.yml&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;You should now have a happily running (reasonably configured) elasticsearch data node.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Index Some Data&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Prerequisites:&lt;br /&gt;&lt;br /&gt;&lt;ul&gt;&lt;br /&gt;&lt;li&gt;You have a working hadoop cluster&lt;/li&gt;&lt;br /&gt;&lt;li&gt;Elasticsearch data nodes are installed and running on all your machines and they have discovered each other. See the elasticsearch documentation for details on making that actually work.&lt;/li&gt;&lt;br /&gt;&lt;li&gt;You've installed the following rubygems: 'configliere' and 'json'&lt;/li&gt;&lt;br /&gt;&lt;/ul&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Get Data&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;As an example lets index this UFO sightings data set from Infochimps &lt;a href="http://infochimps.com/datasets/d60000-documented-ufo-sightings-with-text-descriptions-and-metad"&gt;here&lt;/a&gt;. (You should be familiar with this one by now...) It's mostly raw text and so it's a very reasonable thing to index. Once it's downloaded go ahead and throw it on the HDFS:&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: hadoop fs -mkdir /data/domestic/ufo&lt;br /&gt;$: hadoop fs -put chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv /data/domestic/ufo/&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Index Data&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;This is the easy part:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: bin/wonderdog --rm --field_names=sighted_at,reported_at,location,shape,duration,description --id_field=-1 --index_name=ufo_sightings --object_type=ufo_sighting --es_config=/etc/elasticsearch/elasticsearch.yml /data/domestic/aliens/ufo_awesome.tsv /tmp/elasticsearch/aliens/out&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Flags:&lt;br /&gt;&lt;br /&gt;'--rm' - Remove output on the hdfs if it exists&lt;br /&gt;'--field_names' - A comma separated list of the field names in the tsv, in order&lt;br /&gt;'--id_field' - The field to use as the record id, -1 if the record has no inherent id&lt;br /&gt;'--index_name' - The index name to bulk load into&lt;br /&gt;'--object_type' - The type of objects we're indexing&lt;br /&gt;'--es_config' - Points to the elasticsearch config*&lt;br /&gt;&lt;br /&gt;*The elasticsearch config that the hadoop machines need must be on all the hadoop machines and have a 'hosts' entry listing the ips of all the elasticsearch data nodes (see wonderdog/config/elasticsearch-example.yml). This means we can run the hadoop job on a different cluster than the elasticsearch data nodes are running on.&lt;br /&gt;&lt;br /&gt;The other two arguments are the input and output paths. The output path in this case only gets written to if one or more index requests fail. This way you can re-run the job on only those records that didn't make it the first time.&lt;br /&gt;&lt;br /&gt;The indexing should go pretty quickly. &lt;br /&gt;Next is to refresh the index so we can actually query our newly indexed data. There's a tool in wonderdog's bin directory for that:&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: bin/estool --host=`hostname -i` refresh_index&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Query Data&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;Once again, use estool&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: bin/estool --host=`hostname -i` --index_name=ufo_sightings --query_string="ufo" query&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-7801728563983802324?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/7801728563983802324/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/bulk-indexing-with-elasticsearch-and.html#comment-form' title='7 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/7801728563983802324'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/7801728563983802324'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/bulk-indexing-with-elasticsearch-and.html' title='Bulk Indexing With ElasticSearch and Hadoop'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>7</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-8943847159847146903</id><published>2011-01-22T23:06:00.001-08:00</published><updated>2011-01-22T23:36:24.061-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='notes'/><category scheme='http://www.blogger.com/atom/ns#' term='java'/><category scheme='http://www.blogger.com/atom/ns#' term='jruby'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>JRuby and Hadoop, Notes From a Non-Java Programmer</title><content type='html'>So I spent a fair deal of time playing with JRuby this weekend. Here's my notes/conclusions so far. Disclaimer: My experience with java overall is limited, so most of this might be obvious to a java ninja but maybe not to a ruby one...&lt;br /&gt;&lt;br /&gt;Goal:&lt;br /&gt;&lt;br /&gt;The whole point here is that it sure would be nice to only write a few lines of ruby code into a file called something like 'rb_mapreduce.rb' and run it by saying: ./rb_mapreduce.rb. No compiling, no awkward syntax. Pure ruby, plain and simple.&lt;br /&gt;&lt;br /&gt;Experiments/Notes:&lt;br /&gt;&lt;br /&gt;I created a 'WukongPlusMapper' that subclassed 'org.apache.hadoop.mapreduce.Mapper' and implemented the important methods, namely 'map'. Then I setup and launched a job from inside jruby using this jruby mapper ('WukongPlusMapper') as the map class. &lt;br /&gt;&lt;br /&gt;The job launched and ran just fine. But...&lt;br /&gt;&lt;br /&gt;Problems and Lessons Learned:&lt;br /&gt;&lt;ul&gt;&lt;br /&gt;&lt;li&gt;It is possible (in fact extremely easy) to setup and launch a Hadoop job with pure jruby&lt;/li&gt;&lt;br /&gt;&lt;li&gt;It is not possible, that I can tell so far, to use an uncompiled jruby class as either the mapper or the reducer for a Hadoop job. It doesn't throw an error (so long as you've subclassed a proper java mapper) but actually just uses the superclass's definition instead. I believe the reason is that each map task must have access to the full class definition for its mapper (only sensible) and has no idea what to do with my transient 'WukongPlusMapper' class. Obviously the same would apply to the reducer&lt;/li&gt;&lt;br /&gt;&lt;li&gt;It &lt;span style="font-style:italic;"&gt;is&lt;/span&gt; possible to compile a jruby class ahead-of-time, stuff it into the job jar, and then launch the job with ordinary means. There are a couple somewhat obvious drawbacks with this method:&lt;br /&gt;  &lt;ul&gt;&lt;br /&gt;  &lt;li&gt;You've got to specify 'java_signatures' for each of your methods that are going to be called inside java&lt;/li&gt;&lt;br /&gt;  &lt;li&gt;Messy logic+code for compiling thyself, stuffing thyself into a jar, shipping thyself with MR job. Might as well just write java at this point. &lt;a href="http://github.com/banshee/radoop"&gt;radoop&lt;/a&gt; has some logic for doing that pretty well laid out.&lt;/li&gt;&lt;/ul&gt;&lt;/li&gt;&lt;br /&gt;&lt;li&gt;It is possible to define and create an object in jruby that subclasses a java class or implements a java interface. Then you can simply overwrite the methods you want to overwrite. It's possible to pass instances of this class to a java runtime that only knows about the superclass and the subclass's methods (at least the ones that have the signatures defined in the superclass) will work just fine. Unfortunately, (and plainly obvious in hindsight) this does NOT work with Hadoop since these instances all show up in java as 'proxy classes' and are only accessible to the launching jvm&lt;/li&gt;&lt;br /&gt;&lt;li&gt;On another note there is the option of using the scripting engine which, as far as I can tell, is what both &lt;a href="http://github.com/fujibee/jruby-on-hadoop"&gt;jruby-on-hadoop&lt;/a&gt; and radoop  are using. Something of concern though is that neither of these two projects seem to have much traction. However, it may be that the scripting engine is the only way to reasonably make this work, at least 2 people vote yes ... &lt;/li&gt;&lt;br /&gt;&lt;/ul&gt;&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;So, implementation complexity aside, it looks like all one would have to do is come up with some way of making JRuby's in-memory class definitions available to all of the spawned mappers and reducers. Probably not something I want to delve into at the moment.&lt;br /&gt;&lt;br /&gt;Script engine it is.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-8943847159847146903?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/8943847159847146903/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/jruby-and-hadoop-notes-from-non-java.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8943847159847146903'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8943847159847146903'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/jruby-and-hadoop-notes-from-non-java.html' title='JRuby and Hadoop, Notes From a Non-Java Programmer'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-8104079344181801468</id><published>2011-01-21T07:25:00.000-08:00</published><updated>2011-01-22T12:20:05.604-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='math'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='R'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Pig, Bringing Simplicity to Hadoop</title><content type='html'>In strong contrast to the seat-of-your-pants style of Wukong there is another high level language for Hadoop called Pig. See &lt;a href="http://pig.apache.org/"&gt;Apache Pig&lt;/a&gt;.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Overview&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;At the top level, here's what Pig gets you:&lt;br /&gt;&lt;br /&gt;&lt;ul&gt;&lt;br /&gt;&lt;li&gt;No java required. That is, use as little (zero) or as much (reams) of java code as you want.&lt;/li&gt;&lt;br /&gt;&lt;li&gt;No boilerplate code&lt;/li&gt;&lt;br /&gt;&lt;li&gt;Intuitive and easy to understand language (similar to SQL) with clean uniform syntax&lt;/li&gt;&lt;br /&gt;&lt;li&gt;Separation of high level algorithm and low level map-reduce jobs&lt;/li&gt;&lt;br /&gt;&lt;li&gt;Build your analysis as a set of operations acting on data&lt;/li&gt;&lt;br /&gt;&lt;li&gt;Most algorithms are less than 5, human readable, lines of Pig&lt;/li&gt;&lt;br /&gt;&lt;/ul&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Get Pig&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Go &lt;a href="http://www.apache.org/dyn/closer.cgi/pig"&gt;here&lt;/a&gt; and download pig (version 0.8) from somewhere close to you. Unpack it and put it wherever you like. Then, type 'pig' at the command line and see what happens. It's very likely that it doesn't pick up your existing Hadoop configuration. To fix that set HADOOP_HOME to point to your hadoop installation and PIG_CLASSPATH to point to your hadoop configuration (probably /etc/hadoop/conf). Here's all that again:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: wget http://mirrors.axint.net/apache//pig/pig-0.8.0/pig-0.8.0.tar.gz&lt;br /&gt;$: tar -zxf pig-0.8.0.tar.gz&lt;br /&gt;$: sudo mv pig-0.8.0 /usr/local/share/&lt;br /&gt;$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig&lt;br /&gt;$: sudo ln -s /usr/local/share/pig/bin/pig /usr/local/bin/pig&lt;br /&gt;$: hash -r&lt;br /&gt;$: export HADOOP_HOME=/usr/lib/hadoop&lt;br /&gt;$: export PIG_CLASSPATH=/etc/hadoop/conf&lt;br /&gt;$: pig&lt;br /&gt;2011-01-21 09:56:32,486 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/jacob/pig_1295625392480.log&lt;br /&gt;2011-01-21 09:56:32,879 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master:8020&lt;br /&gt;2011-01-21 09:56:33,402 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: master:54311&lt;br /&gt;grunt&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;See how easy that was. Get it over with now.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Firsties&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;People post the most interesting data on Infochimps. Let's get the first billion digits of pi &lt;a href="http://infochimps.com/datasets/the-first-billion-digits-of-pi"&gt;here&lt;/a&gt;. Notice that it's arranged in groups of 10 digits with 10 groups per line. We're going to use pig to see how the digits are distributed. &lt;br /&gt;&lt;br /&gt;This will allow us to visually (once we make a plot) spot any obvious deviation from a random series of numbers. That is, for a random series of numbers we'd expect each digit (0-9) to appear equally often. If this isn't true then we know we're dealing with a more complicated beast.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Pre-Process&lt;/h3&gt;&lt;br /&gt;After catting the data file you'll notice the end of the line has the crufty details attached that makes this data more or less impossible to load into Pig as a table. Pig is terrible at data munging/parsing/cleaning. Thankfully we have wukong. Let's write a dead simple wukong script to fix the last field and create one digit per line:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;&lt;br /&gt;class PiCleaner &lt; Wukong::Streamer::LineStreamer&lt;br /&gt;  def process line&lt;br /&gt;    fields               = line.strip.split(' ', 10)&lt;br /&gt;    hundred_digit_string = [fields[0..8], fields[9][0..9]].join rescue ""&lt;br /&gt;    hundred_digit_string.each_char{|digit| yield digit}&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;Wukong::Script.new(PiCleaner, nil).run&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Let's go ahead and run that right away (this will take a few minutes depending on your rig, it'll be a billion lines of output...):&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: hdp-mkdir /data/math/pi/&lt;br /&gt;$: hdp-put pi-010.txt /data/math/pi/&lt;br /&gt;$: ./pi_clean.rb --run /data/math/pi/pi-010.txt /data/math/pi/first_billion_digits.tsv&lt;br /&gt;I, [2011-01-22T11:01:57.363704 #12489]  INFO -- :   Launching hadoop!&lt;br /&gt;I, [2011-01-22T11:01:57.363964 #12489]  INFO -- : Running&lt;br /&gt;&lt;br /&gt;/usr/local/share/hadoop/bin/hadoop  \&lt;br /&gt;  jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar  \&lt;br /&gt;  -D mapred.reduce.tasks=0  \&lt;br /&gt;  -D mapred.job.name='pi_clean.rb---/data/math/pi/pi-010.txt---/data/math/pi/first_billion_digits.tsv'  \&lt;br /&gt;  -mapper  '/usr/bin/ruby1.8 pi_clean.rb --map '  \&lt;br /&gt;  -reducer ''  \&lt;br /&gt;  -input   '/data/math/pi/pi-010.txt'  \&lt;br /&gt;  -output  '/data/math/pi/first_billion_digits.tsv'  \&lt;br /&gt;  -file    '/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb'  \&lt;br /&gt;  -cmdenv 'RUBYLIB=~/.rubylib'&lt;br /&gt;&lt;br /&gt;11/01/22 11:01:59 INFO mapred.FileInputFormat: Total input paths to process : 1&lt;br /&gt;11/01/22 11:01:59 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]&lt;br /&gt;11/01/22 11:01:59 INFO streaming.StreamJob: Running job: job_201012031305_0251&lt;br /&gt;11/01/22 11:01:59 INFO streaming.StreamJob: To kill this job, run:&lt;br /&gt;11/01/22 11:01:59 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=master:54311 -kill job_201012031305_0251&lt;br /&gt;11/01/22 11:01:59 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0251&lt;br /&gt;11/01/22 11:02:00 INFO streaming.StreamJob:  map 0%  reduce 0%&lt;br /&gt;11/01/22 11:05:11 INFO streaming.StreamJob:  map 100%  reduce 100%&lt;br /&gt;11/01/22 11:05:11 INFO streaming.StreamJob: Job complete: job_201012031305_0251&lt;br /&gt;11/01/22 11:05:11 INFO streaming.StreamJob: Output: /data/math/pi/first_billion_digits.tsv&lt;br /&gt;packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar4401930660028806042/] [] /tmp/streamjob3153669001520547.jar tmpDir=null&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Great. Now let's write some Pig:&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Analysis&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;This is what Pig is awesome at. Remember that accumulating reducer in wukong? We're about to do the identical thing in two lines of no-nonsense pig:&lt;br /&gt;&lt;pre&gt;&lt;br /&gt;-- load&lt;br /&gt;digits = LOAD '$PI_DIGITS' AS (digit:int);&lt;br /&gt;&lt;br /&gt;groups = GROUP digits BY digit;&lt;br /&gt;counts = FOREACH groups GENERATE group AS digit, COUNT(digits) AS num_digits;&lt;br /&gt;&lt;br /&gt;-- store&lt;br /&gt;STORE counts INTO '$OUT';&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;All we're doing here is reading in the data (one digit per line), accumulating all the digits with the same 'digit', and counting them up. Save it into a file called 'pi.pig' and run with the following:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: pig -p PI_DIGITS=/data/math/pi/first_billion_digits.tsv -p OUT=/data/math/pi/digit_counts.tsv pi.pig&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;I'll skip the output since it's rather verbose. Now we can make a simple plot by hdp-catting our output data into a local file (it's super tiny by now) and plotting it with R:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: hdp-catd /data/math/pi/digit_counts.tsv &gt; digit_counts.tsv&lt;br /&gt;$: R&lt;br /&gt;&gt; library(ggplot2)&lt;br /&gt;&gt; digit_counts &lt;- read.table('digit_counts.tsv', header=FALSE, sep="\t")&lt;br /&gt;&gt; names(digit_counts) &lt;- c('digit', 'count')&lt;br /&gt;&gt; p &lt;- ggplot(digit_counts, aes(x=digit)) + geom_histogram(aes(y = ..density.., weight = count, binwidth=1), colour='black', fill='grey', alpha=0.7)&lt;br /&gt;&gt; p + scale_y_continuous(limits=c(0,0.5))&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Will result in the following:&lt;br /&gt;&lt;br /&gt;&lt;a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="http://3.bp.blogspot.com/_rbU0qmUQfs8/TTs6rIjZsPI/AAAAAAAAALg/F0EJb5So6Yk/s1600/pi_digits.png"&gt;&lt;img style="display:block; margin:0px auto 10px; text-align:center;cursor:pointer; cursor:hand;width: 200px; height: 80px;" src="http://3.bp.blogspot.com/_rbU0qmUQfs8/TTs6rIjZsPI/AAAAAAAAALg/F0EJb5So6Yk/s200/pi_digits.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5565106277251133682" /&gt;&lt;/a&gt;&lt;br /&gt;&lt;br /&gt;I'll leave it to you to make your own assumptions.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-8104079344181801468?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/8104079344181801468/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/pig-bringing-sanity-to-hadoop.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8104079344181801468'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8104079344181801468'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/pig-bringing-sanity-to-hadoop.html' title='Pig, Bringing Simplicity to Hadoop'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><media:thumbnail xmlns:media='http://search.yahoo.com/mrss/' url='http://3.bp.blogspot.com/_rbU0qmUQfs8/TTs6rIjZsPI/AAAAAAAAALg/F0EJb5So6Yk/s72-c/pi_digits.png' height='72' width='72'/><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-3169615094483950920</id><published>2011-01-20T08:10:00.000-08:00</published><updated>2011-01-20T12:14:16.252-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='network'/><category scheme='http://www.blogger.com/atom/ns#' term='infochimps'/><category scheme='http://www.blogger.com/atom/ns#' term='graph'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong example'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Graph Processing With Wukong and Hadoop</title><content type='html'>As a last (for now) tutorial oriented post on Wukong, let's process a network graph.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Get Data&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;This airport data (&lt;a href="http://infochimps.com/datasets/d35-million-us-domestic-flights-from-1990-to-2009-edges-only"&gt;airport edges&lt;/a&gt;) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Explore Data&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;We've got to actually &lt;span style="font-style:italic;"&gt;look&lt;/span&gt; at the data before we can make any decisions about how to process it and what questions we'd like answered:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: head data/flights_with_colnames.tsv | wu-lign &lt;br /&gt;origin_airport destin_airport passengers flights month &lt;br /&gt;MHK            AMW                    21       1 200810&lt;br /&gt;EUG            RDM                    41      22 199011&lt;br /&gt;EUG            RDM                    88      19 199012&lt;br /&gt;EUG            RDM                    11       4 199010&lt;br /&gt;MFR            RDM                     0       1 199002&lt;br /&gt;MFR            RDM                    11       1 199003&lt;br /&gt;MFR            RDM                     2       4 199001&lt;br /&gt;MFR            RDM                     7       1 199009&lt;br /&gt;MFR            RDM                     7       2 199011&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;So it's exactly what you'd expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics...&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Ask A Question&lt;/h2&gt;&lt;br /&gt;A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph: &lt;br /&gt;&lt;br /&gt;1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the 'in degree' and passengers out is (naturally) called the 'out degree'.&lt;br /&gt;&lt;br /&gt;2. Flights Degree: For a given airport the number of flights in + the number of flights out.&lt;br /&gt;&lt;br /&gt;Let's write the question wukong style:&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;&lt;br /&gt;class EdgeMapper &lt; Wukong::Streamer::RecordStreamer&lt;br /&gt;  #&lt;br /&gt;  # Yield both ways so we can sum (passengers in + passengers out) and (flights&lt;br /&gt;  # in + flights out) individually in the reduce phase.&lt;br /&gt;  #&lt;br /&gt;  def process origin_code, destin_code, passengers, flights, month&lt;br /&gt;    yield [origin_code, month, "OUT", passengers, flights]&lt;br /&gt;    yield [destin_code, month, "IN", passengers, flights]&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;class DegreeCalculator &lt; Wukong::Streamer::AccumulatingReducer&lt;br /&gt;  #&lt;br /&gt;  # What are we going to use as a key internally?&lt;br /&gt;  #&lt;br /&gt;  def get_key airport, month, in_or_out, passengers, flights&lt;br /&gt;    [airport, month]&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def start! airport, month, in_or_out, passengers, flights&lt;br /&gt;    @out_degree = {:passengers =&gt; 0, :flights =&gt; 0}&lt;br /&gt;    @in_degree  = {:passengers =&gt; 0, :flights =&gt; 0}&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def accumulate airport, month, in_or_out, passengers, flights&lt;br /&gt;    case in_or_out&lt;br /&gt;    when "IN" then&lt;br /&gt;      @in_degree[:passengers] += passengers.to_i&lt;br /&gt;      @in_degree[:flights]    += flights.to_i&lt;br /&gt;    when "OUT" then&lt;br /&gt;      @out_degree[:passengers] += passengers.to_i&lt;br /&gt;      @out_degree[:flights]    += flights.to_i&lt;br /&gt;    end&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  #&lt;br /&gt;  # For every airport and month, calculate passenger and flight degrees&lt;br /&gt;  #&lt;br /&gt;  def finalize&lt;br /&gt;&lt;br /&gt;    # Passenger degrees (out, in, and total)&lt;br /&gt;    passengers_out   = @out_degree[:passengers]&lt;br /&gt;    passengers_in    = @in_degree[:passengers]&lt;br /&gt;    passengers_total = passengers_in + passengers_out&lt;br /&gt;&lt;br /&gt;    # Flight degrees (out, in, and total)&lt;br /&gt;    flights_out      = @out_degree[:flights]&lt;br /&gt;    flights_in       = @in_degree[:flights]&lt;br /&gt;    flights_total    = flights_in + flights_out&lt;br /&gt;&lt;br /&gt;    yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;#&lt;br /&gt;# Need to use 2 fields for partition so every record with the same airport and&lt;br /&gt;# month land on the same reducer&lt;br /&gt;#&lt;br /&gt;Wukong::Script.new(&lt;br /&gt;  EdgeMapper,&lt;br /&gt;  DegreeCalculator,&lt;br /&gt;  :partition_fields  =&gt; 2 # use two fields to partition records&lt;br /&gt;  ).run&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Don't panic. There's a lot going on in this script so here's the breakdown (real gentle like):&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Mapper&lt;/h3&gt;&lt;br /&gt;Here we're using wukong's RecordStreamer class which reads lines from $stdin and splits on tabs for us already. That's how we know exactly what arguments the process method gets.&lt;br /&gt;&lt;br /&gt;Next, as is often the case with low level map-reduce, we've got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information ("OUT" or "IN") depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.&lt;br /&gt;&lt;br /&gt;Finally, we've carefully rearranged our records such that (airport,month) is always the first two fields. We'll partition on this as the key. (We have to say that explicitly at the bottom of the script)&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Reducer&lt;/h3&gt;&lt;br /&gt;We've seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we've explicitly told it to use the airport and month as the key with the 'get_key' method.&lt;br /&gt;&lt;br /&gt;* start! - Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let's all take a moment and think about how awful and unreadable that would be in java...)&lt;br /&gt;&lt;br /&gt;* accumulate - Here we simply look at each record and decide which counters to increment depending on whether it's "OUT" or "IN".&lt;br /&gt;&lt;br /&gt;* finalize - All we're doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the 'key' is just (airport,month).&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Get An Answer&lt;/h2&gt;&lt;br /&gt;We know how to put the data on the hdfs and run the script by now so we'll skip that part. Here's what the output looks like:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign &lt;br /&gt;1B1 200906 1   1   2 1  1  2&lt;br /&gt;ABE 200705 0  83  83 0  3  3&lt;br /&gt;ABE 199206 0  31  31 0  1  1&lt;br /&gt;ABE 200708 0 904 904 0 20 20&lt;br /&gt;ABE 200307 0  91  91 0  2  2&lt;br /&gt;ABE 200703 0  36  36 0  1  1&lt;br /&gt;ABE 199902 0  84  84 0  1  1&lt;br /&gt;ABE 200611 0 753 753 0 18 18&lt;br /&gt;ABE 199209 0  99  99 0  1  1&lt;br /&gt;ABE 200702 0  54  54 0  1  1&lt;br /&gt;ABE 200407 0  98  98 0  1  1&lt;br /&gt;ABE 200705 0 647 647 0 15 15&lt;br /&gt;ABE 200306 0  27  27 0  1  1&lt;br /&gt;ABE 200703 0 473 473 0 11 11&lt;br /&gt;ABE 200309 0 150 150 0  1  1&lt;br /&gt;ABE 200702 0 313 313 0  8  8&lt;br /&gt;ABE 200103 0   0   0 0  1  1&lt;br /&gt;ABE 199807 0 105 105 0  1  1&lt;br /&gt;ABE 199907 0  91  91 0  1  1&lt;br /&gt;ABE 199501 0  50  50 0  1  1 &lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;At this point is where you might bring this back down to your local file system, crack open a program like R, make some plots, etc.&lt;br /&gt;&lt;br /&gt;And we're done for now. Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-3169615094483950920?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/3169615094483950920/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/graph-processing-with-wukong-and-hadoop.html#comment-form' title='2 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/3169615094483950920'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/3169615094483950920'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/graph-processing-with-wukong-and-hadoop.html' title='Graph Processing With Wukong and Hadoop'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>2</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-5597489617591552093</id><published>2011-01-19T11:25:00.000-08:00</published><updated>2011-01-19T11:53:34.594-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='cloudera'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='apache'/><category scheme='http://www.blogger.com/atom/ns#' term='pig-0.8'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Apache Pig 0.8 with Cloudera cdh3</title><content type='html'>So it's January and &lt;a href="http://www.cloudera.com/"&gt;Cloudera&lt;/a&gt; hasn't released pig 0.8 as a debian package yet. Too bad. Turns out for the particular project I'm working on it's important to have a custom partioner, only available in pig 0.8. Also, I'd like to make use of the HbaseStorage load and storefuncs. Also, only available in 0.8. Anyhow, here's how I got it working with my current install of Hadoop (cdh3):&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Get Pig&lt;/h2&gt;&lt;br /&gt;Go the the Pig releases page &lt;a href="http://pig.apache.org/releases.html"&gt;here&lt;/a&gt; and download the apache release for pig-0.8&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Install Pig&lt;/h2&gt;&lt;br /&gt;Skip this part if you don't care (ie. you're going to put wherever you want and don't give a flip what my opinion is on where it should go). It's usually a good idea to put things you download and install yourself in /usr/local/share/&lt;thing&gt; so it doesn't conflict with /usr/lib/&lt;thing&gt; when you apt-get install it. So go ahead and unpack the downloaded archive into that directory.&lt;br /&gt;&lt;br /&gt;As an example (for those of us just getting familiar):&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: wget http://apache.mesi.com.ar//pig/pig-0.8.0/pig-0.8.0.tar.gz&lt;br /&gt;$: tar -zxvf pig-0.8.0.tar.gz&lt;br /&gt;$: sudo mv pig-0.8.0 /usr/local/share/&lt;br /&gt;$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Perform Pig Surgery&lt;/h2&gt;&lt;br /&gt;As it stands your new pig install will not work with cloudera hadoop. Let's fix that.&lt;br /&gt;&lt;br /&gt;1. Nuke the current pig jar and rebuild without hadoop&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: sudo rm pig-0.8.0-core.jar &lt;br /&gt;$: sudo ant jar-withouthadoop&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;2. Add these lines to bin/pig (I don't think it matters where, I put mine before PIG_CLASSPATH is set):&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;# Add installed version of Hadoop to classpath&lt;br /&gt;HADOOP_HOME=${HADOOP_HOME:-/usr/lib/hadoop}&lt;br /&gt;. $HADOOP_HOME/bin/hadoop-config.sh&lt;br /&gt;&lt;br /&gt;for jar in $HADOOP_HOME/hadoop-core-*.jar $HADOOP_HOME/lib/* ; do&lt;br /&gt;   CLASSPATH=$CLASSPATH:$jar&lt;br /&gt;done&lt;br /&gt;if [ ! -z "$HADOOP_CLASSPATH" ] ; then&lt;br /&gt;  CLASSPATH=$CLASSPATH:$HADOOP_CLASSPATH&lt;br /&gt;fi&lt;br /&gt;if [ ! -z "$HADOOP_CONF_DIR" ] ; then&lt;br /&gt;  CLASSPATH=$CLASSPATH:$HADOOP_CONF_DIR&lt;br /&gt;fi&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;3. Nuke the build dir and rename pig-withouthadoop.jar&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: sudo mv pig-withouthadoop.jar pig-0.8.0-core.jar&lt;br /&gt;$: sudo rm -r build&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;4. Test it out&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: bin/pig&lt;br /&gt;2011-01-19 13:49:07,766 [main] INFO  org.apache.pig.Main - Logging error messages to: /usr/local/share/pig-0.8.0/pig_1295466547762.log&lt;br /&gt;2011-01-19 13:49:07,959 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:8020&lt;br /&gt;2011-01-19 13:49:08,163 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:8021&lt;br /&gt;grunt&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;You can try typing things like 'ls' in the grunt shell to make sure it sees your HDFS. Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-5597489617591552093?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/5597489617591552093/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/apache-pig-08-with-cloudera-cdh3.html#comment-form' title='4 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/5597489617591552093'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/5597489617591552093'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/apache-pig-08-with-cloudera-cdh3.html' title='Apache Pig 0.8 with Cloudera cdh3'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>4</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-9064346417520768197</id><published>2011-01-17T09:09:00.000-08:00</published><updated>2011-01-26T22:07:54.607-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='command-line'/><category scheme='http://www.blogger.com/atom/ns#' term='xml'/><category scheme='http://www.blogger.com/atom/ns#' term='parse'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Processing XML Records with Hadoop and Wukong</title><content type='html'>Another common pattern that Wukong addresses exceedingly well is liberating data from unwieldy formats (XML) into tsv. For example, lets consider the following Hacker News dataset: See &lt;a href="http://redmonk.com/sogrady/2010/12/14/popular-on-hacker-news/"&gt;RedMonk Analytics&lt;/a&gt;&lt;br /&gt;&lt;br /&gt;A single record looks like this:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: plain"&gt;&lt;br /&gt;&amp;lt;row&amp;gt;&amp;lt;ID&amp;gt;33&amp;lt;/ID&amp;gt;&amp;lt;ParentID&amp;gt;31&amp;lt;/ParentID&amp;gt;&amp;lt;Text&amp;gt;&amp;amp;lt;font color=&amp;quot;#5a5a5a&amp;quot;&amp;amp;gt;winnar winnar chicken dinnar!&amp;amp;lt;/font&amp;amp;gt;&amp;lt;/Text&amp;gt;&amp;lt;Username&amp;gt;spez&amp;lt;/Username&amp;gt;&amp;lt;Points&amp;gt;0&amp;lt;/Points&amp;gt;&amp;lt;Type&amp;gt;2&amp;lt;/Type&amp;gt;&amp;lt;Timestamp&amp;gt;2006-10-10T21:11:18.093&amp;lt;/Timestamp&amp;gt;&amp;lt;CommentCount&amp;gt;0&amp;lt;/CommentCount&amp;gt;&amp;lt;/row&amp;gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;And here's a wukong example script that turns that into tsv:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;require 'wukong/encoding'&lt;br /&gt;require 'crack'&lt;br /&gt;&lt;br /&gt;class HackernewsComment &lt; Struct.new(:username, :url, :title, :text, :timestamp, :comment_id, :points, :comment_count, :type)&lt;br /&gt;  def self.parse raw&lt;br /&gt;    raw_hash = Crack::XML.parse(raw.strip)&lt;br /&gt;    return unless raw_hash&lt;br /&gt;    return unless raw_hash["row"]&lt;br /&gt;    raw_hash                 = raw_hash["row"]&lt;br /&gt;    raw_hash[:username]      = raw_hash["Username"].wukong_encode if raw_hash["Username"]&lt;br /&gt;    raw_hash[:url]           = raw_hash["Url"].wukong_encode      if raw_hash["Url"]&lt;br /&gt;    raw_hash[:title]         = raw_hash["Title"].wukong_encode    if raw_hash["Title"]&lt;br /&gt;    raw_hash[:text]          = raw_hash["Text"].wukong_encode     if raw_hash["Text"]&lt;br /&gt;    raw_hash[:feed_id]       = raw_hash["ID"].to_i                if raw_hash["ID"]&lt;br /&gt;    raw_hash[:points]        = raw_hash["Points"].to_i            if raw_hash["Points"]&lt;br /&gt;    raw_hash[:comment_count] = raw_hash["CommentCount"].to_i      if raw_hash["CommentCount"]&lt;br /&gt;    raw_hash[:type]          = raw_hash["Type"].to_i              if raw_hash["Type"]&lt;br /&gt;&lt;br /&gt;    # Eg. Map '2010-10-26T19:29:59.717' to easier to work with '20101027002959'&lt;br /&gt;    raw_hash[:timestamp]     = Time.parse_and_flatten(raw_hash["Timestamp"]) if raw_hash["Timestamp"]&lt;br /&gt;    #&lt;br /&gt;    self.from_hash(raw_hash, true)&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;class XMLParser &lt; Wukong::Streamer::LineStreamer&lt;br /&gt;  def process line&lt;br /&gt;    return unless line =~ /^\&amp;lt;row/&lt;br /&gt;    yield HackernewsComment.parse(line)&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;Wukong::Script.new(XMLParser, nil).run&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Here's how it works. We're going to use the "StreamXmlRecordReader" for hadoop streaming. What this does is give the map task one row per map. That's our line variable. Additionally, we've defined a data model to read the row into called "HackernewsComment". This guy is responsible for parsing the xml record and creating a new instance of itself. &lt;br /&gt;&lt;br /&gt;Inside the HackernewsComment's parse method we create clean fields that we'd like to use. Wukong has a method for strings called 'wukong_encode' which simply xml encodes the text so weird characters aren't an issue. You can imagine modifying the raw fields in other ways to construct and fill the fields of your output data model.&lt;br /&gt;&lt;br /&gt;Finally, a new instance of HackernewsComment is created using the clean fields and emitted. Notice that we don't have to do anything special to the new comment once it's created. That's because Wukong will do the "right thing" and serialize out the class name as a flat field (hackernews_comment) along with the fields, in order, as a tsv record.&lt;br /&gt;&lt;br /&gt;Save this into a file called "process_xml.rb" and run with the following:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$:./process_xml.rb --split_on_xml_tag=row --run /tmp/hn-sample.xml /tmp/xml_out&lt;br /&gt;I, [2011-01-17T11:09:17.461643 #5519]  INFO -- :   Launching hadoop!&lt;br /&gt;I, [2011-01-17T11:09:17.461757 #5519]  INFO -- : Running&lt;br /&gt;&lt;br /&gt;/usr/local/share/hadoop/bin/hadoop  \&lt;br /&gt;  jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar  \&lt;br /&gt;  -D mapred.reduce.tasks=0  \&lt;br /&gt;  -D mapred.job.name='process_xml.rb---/tmp/hn-sample.xml---/tmp/xml_out'  \&lt;br /&gt;  -inputreader 'StreamXmlRecordReader,begin=&amp;lt;row&amp;gt;,end=&amp;lt;/row&amp;gt;'  \&lt;br /&gt;  -mapper  '/usr/bin/ruby1.8 process_xml.rb --map '  \&lt;br /&gt;  -reducer ''  \&lt;br /&gt;  -input   '/tmp/hn-sample.xml'  \&lt;br /&gt;  -output  '/tmp/xml_out'  \&lt;br /&gt;  -file    '/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb'  \&lt;br /&gt;  -cmdenv 'RUBYLIB=$HOME/.rubylib'&lt;br /&gt;&lt;br /&gt;11/01/17 11:09:18 INFO mapred.FileInputFormat: Total input paths to process : 1&lt;br /&gt;11/01/17 11:09:19 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]&lt;br /&gt;11/01/17 11:09:19 INFO streaming.StreamJob: Running job: job_201012031305_0243&lt;br /&gt;11/01/17 11:09:19 INFO streaming.StreamJob: To kill this job, run:&lt;br /&gt;11/01/17 11:09:19 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=master:54311 -kill job_201012031305_0243&lt;br /&gt;11/01/17 11:09:19 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0243&lt;br /&gt;11/01/17 11:09:20 INFO streaming.StreamJob:  map 0%  reduce 0%&lt;br /&gt;11/01/17 11:09:34 INFO streaming.StreamJob:  map 100%  reduce 0%&lt;br /&gt;11/01/17 11:09:40 INFO streaming.StreamJob:  map 87%  reduce 0%&lt;br /&gt;11/01/17 11:09:43 INFO streaming.StreamJob:  map 87%  reduce 100%&lt;br /&gt;11/01/17 11:09:43 INFO streaming.StreamJob: Job complete: job_201012031305_0243&lt;br /&gt;11/01/17 11:09:43 INFO streaming.StreamJob: Output: /tmp/xml_out&lt;br /&gt;packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar902611811523431467/] [] /tmp/streamjob681918437315823836.jar tmpDir=null&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Finally, let's take a look at our new, happily liberated, tsv records:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: hdp-catd /tmp/xml_out | head | wu-lign &lt;br /&gt;hackernews_comment Harj        http://blog.harjtaggar.com                                                    YC Founder looking for Rails Tutor                                       20101027002959 0  5  0 1&lt;br /&gt;hackernews_comment pg          http://ycombinator.com                                                        Y Combinator                                                             20061010003558 1 39 15 1&lt;br /&gt;hackernews_comment phyllis     http://www.paulgraham.com/mit.html                                            A Student&amp;apos;s Guide to Startups                                       20061010003648 2 12  0 1&lt;br /&gt;hackernews_comment phyllis     http://www.foundersatwork.com/stevewozniak.html                               Woz Interview: the early days of Apple                                   20061010183848 3  7  0 1&lt;br /&gt;hackernews_comment onebeerdave http://avc.blogs.com/a_vc/2006/10/the_nyc_develop.html                        NYC Developer Dilemma                                                    20061010184037 4  6  0 1&lt;br /&gt;hackernews_comment perler      http://www.techcrunch.com/2006/10/09/google-youtube-sign-more-separate-deals/ Google, YouTube acquisition announcement could come tonight              20061010184105 5  6  0 1&lt;br /&gt;hackernews_comment perler      http://360techblog.com/2006/10/02/business-intelligence-the-inkling-way/      Business Intelligence the Inkling Way: cool prediction markets software  20061010185246 6  5  0 1&lt;br /&gt;hackernews_comment phyllis     http://featured.gigaom.com/2006/10/09/sevin-rosen-unfunds-why/                Sevin Rosen Unfunds - why?                                               20061010021030 7  5  0 1&lt;br /&gt;hackernews_comment frobnicate  http://news.bbc.co.uk/2/hi/programmes/click_online/5412216.stm                LikeBetter featured by BBC                                               20061010021033 8 10  0 1&lt;br /&gt;hackernews_comment askjigga    http://www.weekendr.com/                                                      weekendr: social network for the weekend                                 20061010021036 9  3  0 1&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Hurray.&lt;br /&gt;&lt;br /&gt;&lt;br /&gt;As a side note, I strongly encourage comments. Seriously. How am I supposed to know what's useful for you and what isn't unless you comment?&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-9064346417520768197?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/9064346417520768197/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/processing-xml-records-with-hadoop-and.html#comment-form' title='3 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/9064346417520768197'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/9064346417520768197'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/processing-xml-records-with-hadoop-and.html' title='Processing XML Records with Hadoop and Wukong'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>3</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-4201702787234337245</id><published>2011-01-16T18:49:00.000-08:00</published><updated>2011-01-16T20:01:31.433-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='cat'/><category scheme='http://www.blogger.com/atom/ns#' term='command-line'/><category scheme='http://www.blogger.com/atom/ns#' term='bash'/><category scheme='http://www.blogger.com/atom/ns#' term='examples'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Processing JSON Records With Hadoop and Wukong</title><content type='html'>For another illustration of how Wukong is making it way simpler to work with data, let's process some real JSON records.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Get Data&lt;/h2&gt;&lt;br /&gt;Download this awesome UFO data at &lt;a href="http://infochimps.com/datasets/d60000-documented-ufo-sightings-with-text-descriptions-and-metad"&gt;Infochimps&lt;/a&gt;. It's over 60,000 documented ufo sightings with text descriptions. Best of all, it's available in tsv, json, and avro formats. Downloading the bz2 package will get you all three.&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Explore Data&lt;/h2&gt;&lt;br /&gt;Once you've got your data set lets crack open the json version and take a look at it:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;# weirdness in infochimps packaging (a zipped bz2?)&lt;br /&gt;$: unzip icsdata-d60000-documented-ufo-sightings-with-text-descriptions-and-metad_20101020143604-bz2.zip&lt;br /&gt;Archive:  icsdata-d60000-documented-ufo-sightings-with-text-descriptions-and-metad_20101020143604-bz2.zip&lt;br /&gt;   creating: chimps_16154-2010-10-20_14-33-35/&lt;br /&gt;  inflating: chimps_16154-2010-10-20_14-33-35/README-infochimps  &lt;br /&gt;  inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv  &lt;br /&gt;  inflating: chimps_16154-2010-10-20_14-33-35/16154.yaml  &lt;br /&gt;  inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.avro  &lt;br /&gt;  inflating: chimps_16154-2010-10-20_14-33-35/ufo_awesome.json&lt;br /&gt;$: head chimps_16154-2010-10-20_14-33-35/ufo_awesome.json&lt;br /&gt;{"sighted_at": "19951009", "reported_at": "19951009", "location": " Iowa City, IA", "shape": "", "duration": "", "description": "Man repts. witnessing &amp;quot;flash, followed by a classic UFO, w/ a tailfin at back.&amp;quot; Red color on top half of tailfin. Became triangular."}&lt;br /&gt;{"sighted_at": "19951010", "reported_at": "19951011", "location": " Milwaukee, WI", "shape": "", "duration": "2 min.", "description": "Man  on Hwy 43 SW of Milwaukee sees large, bright blue light streak by his car, descend, turn, cross road ahead, strobe. Bizarre!"}&lt;br /&gt;{"sighted_at": "19950101", "reported_at": "19950103", "location": " Shelton, WA", "shape": "", "duration": "", "description": "Telephoned Report:CA woman visiting daughter witness discs and triangular ships over Squaxin Island in Puget Sound. Dramatic.  Written report, with illustrations, submitted to NUFORC."}&lt;br /&gt;{"sighted_at": "19950510", "reported_at": "19950510", "location": " Columbia, MO", "shape": "", "duration": "2 min.", "description": "Man repts. son&amp;apos;s bizarre sighting of small humanoid creature in back yard.  Reptd. in Acteon Journal, St. Louis UFO newsletter."}&lt;br /&gt;{"sighted_at": "19950611", "reported_at": "19950614", "location": " Seattle, WA", "shape": "", "duration": "", "description": "Anonymous caller repts. sighting 4 ufo&amp;apos;s in NNE sky, 45 deg. above horizon.  (No other facts reptd.  No return tel. #.)"}&lt;br /&gt;{"sighted_at": "19951025", "reported_at": "19951024", "location": " Brunswick County, ND", "shape": "", "duration": "30 min.", "description": "Sheriff&amp;apos;s office calls to rept. that deputy, 20 mi. SSE of Wilmington,  is looking at peculiar, bright white, strobing light."}&lt;br /&gt;{"sighted_at": "19950420", "reported_at": "19950419", "location": " Fargo, ND", "shape": "", "duration": "2 min.", "description": "Female student w/ friend witness huge red light in sky.  2 others witness.  Obj pulsated, started to flicker.  Winked out."}&lt;br /&gt;{"sighted_at": "19950911", "reported_at": "19950911", "location": " Las Vegas, NV", "shape": "", "duration": "", "description": "Man repts. bright, multi-colored obj. in NW night sky. Disappeared while he was in house."}&lt;br /&gt;{"sighted_at": "19950115", "reported_at": "19950214", "location": " Morton, WA", "shape": "", "duration": "", "description": "Woman reports 2 craft fly over house.  Strange events taking place in town w/ paramilitary activities."}&lt;br /&gt;{"sighted_at": "19950915", "reported_at": "19950915", "location": " Redmond, WA", "shape": "", "duration": "6 min.", "description": "Young man w/ 2 co-workers witness tiny, distinctly white round disc drifting slowly toward NE.  Flew in dir. 90 deg. to winds."}&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;looks pretty interesting. As one last (obvious to some, sure) simple check lets see how big it is:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: ls -lh chimps_16154-2010-10-20_14-33-35 &lt;br /&gt;total 220M&lt;br /&gt;-rw-r--r-- 1 jacob 3.4K 2010-10-20 09:34 16154.yaml&lt;br /&gt;-rw-r--r-- 1 jacob  908 2010-10-20 09:34 README-infochimps&lt;br /&gt;-rw------- 1 jacob  72M 2010-10-20 09:33 ufo_awesome.avro&lt;br /&gt;-rw------- 1 jacob  77M 2010-10-20 09:33 ufo_awesome.json&lt;br /&gt;-rw------- 1 jacob  72M 2010-10-20 09:33 ufo_awesome.tsv&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Load Data&lt;/h2&gt;&lt;br /&gt;Now, 77M is small enough that you COULD process on a single machine with methods you already know. However, this example is about hadoop so let's go ahead and throw it on the hadoop distributed file system (HDFS) so we can process it in parallel:&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;hdp-mkdir /data/domestic/ufo&lt;br /&gt;hdp-put chimps_16154-2010-10-20_14-33-35/ufo_awesome.json /data/domestic/ufo/&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;(I'm going to have to assume you already have a HDFS up an running, see &lt;a href="http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/"&gt;this&lt;/a&gt; for a simple how-to.) &lt;br /&gt;If all goes well you should see your file there &lt;pre class="brush: bash"&gt;&lt;br /&gt;hdp-ls /data/domestic/ufo/   &lt;br /&gt;Found 1 items&lt;br /&gt;-rw-r--r--   1 jacob supergroup   80346460 2011-01-16 21:21 /data/domestic/ufo/ufo_awesome.json&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h2&gt;Process Data&lt;/h2&gt;&lt;br /&gt;&lt;br /&gt;Let's write a really simple wukong script to find the most popular ufo shapes:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;require 'json'&lt;br /&gt;&lt;br /&gt;class JSONMapper &lt; Wukong::Streamer::LineStreamer&lt;br /&gt;  def process record&lt;br /&gt;    sighting = JSON.parse(record) rescue {}&lt;br /&gt;    return unless sighting["shape"]&lt;br /&gt;    yield sighting["shape"] unless sighting["shape"].empty?&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;class ShapeReducer &lt; Wukong::Streamer::AccumulatingReducer&lt;br /&gt;&lt;br /&gt;  def start! shape&lt;br /&gt;    @count = 0&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def accumulate shape&lt;br /&gt;    @count += 1&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;  def finalize&lt;br /&gt;    yield [key, @count]&lt;br /&gt;  end&lt;br /&gt;&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;Wukong::Script.new(JSONMapper, ShapeReducer).run&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Mapper&lt;/h3&gt;&lt;br /&gt;Our mapper class, JSONMapper, is nearly identical to the earlier post. All it does is read in single json records from $stdin, parse out the "shape" field (with some rescues for handling data nastiness), and emit the "shape" field back to $stdout.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Reducer&lt;/h3&gt;&lt;br /&gt;The reducer, ShapeReducer, is about the simplest reducer in Wukong that still illustrates the major points:&lt;br /&gt;&lt;br /&gt;* start! - the first method that is called on a new group of data. A group is, if you remember your map-reduce, all records with the same key. In this case it's all the "shape" fields with the same shape. All this method does is decide how to initialize any internal state a reducer has. In this case we simply initialize a counter to 0.&lt;br /&gt;&lt;br /&gt;* accumulate - even simpler. This method operates on each record in the group. In our simple case we just increment the internal counter by 1.&lt;br /&gt;&lt;br /&gt;* finalize - the final step in the reduce. We've processed all our records and so we yield the key corresponding to our group (that's just going to be the unique "shape") and the count so far.&lt;br /&gt;&lt;br /&gt;And that's it. Let's save it into a file called "process_ufo.rb" and run it locally on 10000 lines:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: cat chimps_16154-2010-10-20_14-33-35/ufo_awesome.json| head -n10000| ./process_ufo.rb --map | sort | ./process_ufo.rb --reduce | sort -nk2 | wu-lign&lt;br /&gt;changed      1&lt;br /&gt;dome         1&lt;br /&gt;flare        1&lt;br /&gt;hexagon      1&lt;br /&gt;pyramid      1&lt;br /&gt;crescent     2&lt;br /&gt;round        2&lt;br /&gt;delta        8&lt;br /&gt;cross       25&lt;br /&gt;cone        41&lt;br /&gt;teardrop    79&lt;br /&gt;rectangle  112&lt;br /&gt;egg        113&lt;br /&gt;chevron    128&lt;br /&gt;diamond    137&lt;br /&gt;flash      138&lt;br /&gt;cylinder   155&lt;br /&gt;changing   204&lt;br /&gt;cigar      255&lt;br /&gt;formation  290&lt;br /&gt;oval       333&lt;br /&gt;unknown    491&lt;br /&gt;sphere     529&lt;br /&gt;circle     667&lt;br /&gt;other      721&lt;br /&gt;disk       727&lt;br /&gt;fireball   799&lt;br /&gt;triangle   868&lt;br /&gt;light     1760&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Notice when we run this locally we have to stick the "sort" program in there. This is to simulate what hadoop gives us for free. It looks like &lt;span style="font-style:italic;"&gt;light&lt;/span&gt; is going to come out ahead. Let's see what happens when we run it with hadoop:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: ./process_ufo.rb --run /data/domestic/ufo/ufo_awesome.json /data/domestic/ufo/shape_counts&lt;br /&gt;I, [2011-01-16T21:51:43.431534 #11447]  INFO -- :   Launching hadoop!&lt;br /&gt;I, [2011-01-16T21:51:43.476626 #11447]  INFO -- : Running&lt;br /&gt;&lt;br /&gt;/usr/local/share/hadoop/bin/hadoop  \&lt;br /&gt;  jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar  \&lt;br /&gt;  -D mapred.job.name='process_ufo.rb---/data/domestic/ufo/ufo_awesome.json---/data/domestic/ufo/shape_counts'  \&lt;br /&gt;  -mapper  '/usr/bin/ruby1.8 process_ufo.rb --map '  \&lt;br /&gt;  -reducer '/usr/bin/ruby1.8 /home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb --reduce '  \&lt;br /&gt;  -input   '/data/domestic/ufo/ufo_awesome.json'  \&lt;br /&gt;  -output  '/data/domestic/ufo/shape_counts'  \&lt;br /&gt;  -file    '/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb'  \&lt;br /&gt;  -cmdenv 'RUBYLIB=$HOME/.rubylib'&lt;br /&gt;&lt;br /&gt;11/01/16 21:51:45 INFO mapred.FileInputFormat: Total input paths to process : 1&lt;br /&gt;11/01/16 21:51:46 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]&lt;br /&gt;11/01/16 21:51:46 INFO streaming.StreamJob: Running job: job_201012031305_0221&lt;br /&gt;11/01/16 21:51:46 INFO streaming.StreamJob: To kill this job, run:&lt;br /&gt;11/01/16 21:51:46 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=master:54311 -kill job_201012031305_0221&lt;br /&gt;11/01/16 21:51:46 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0221&lt;br /&gt;11/01/16 21:51:47 INFO streaming.StreamJob:  map 0%  reduce 0%&lt;br /&gt;11/01/16 21:51:59 INFO streaming.StreamJob:  map 100%  reduce 0%&lt;br /&gt;11/01/16 21:52:12 INFO streaming.StreamJob:  map 100%  reduce 100%&lt;br /&gt;11/01/16 21:52:15 INFO streaming.StreamJob: Job complete: job_201012031305_0221&lt;br /&gt;11/01/16 21:52:15 INFO streaming.StreamJob: Output: /data/domestic/ufo/shape_counts&lt;br /&gt;packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar3466191386581838257/] [] /tmp/streamjob3112551829711880856.jar tmpDir=null&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;As one final step let's cat the output data and take a look at it:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;hdp-catd /data/domestic/ufo/shape_counts | sort -nk2 | wu-lign &lt;br /&gt;changed       1&lt;br /&gt;dome          1&lt;br /&gt;flare         1&lt;br /&gt;hexagon       1&lt;br /&gt;pyramid       1&lt;br /&gt;crescent      2&lt;br /&gt;round         2&lt;br /&gt;delta         8&lt;br /&gt;cross       177&lt;br /&gt;cone        265&lt;br /&gt;teardrop    592&lt;br /&gt;egg         661&lt;br /&gt;chevron     757&lt;br /&gt;diamond     909&lt;br /&gt;rectangle   957&lt;br /&gt;cylinder    980&lt;br /&gt;flash       988&lt;br /&gt;changing   1532&lt;br /&gt;cigar      1774&lt;br /&gt;formation  1774&lt;br /&gt;oval       2859&lt;br /&gt;fireball   3436&lt;br /&gt;sphere     3613&lt;br /&gt;unknown    4458&lt;br /&gt;other      4570&lt;br /&gt;disk       4794&lt;br /&gt;circle     5249&lt;br /&gt;triangle   6036&lt;br /&gt;light     12138&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Pretty simple?&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-4201702787234337245?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/4201702787234337245/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/processing-json-records-with-hadoop-and.html#comment-form' title='6 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/4201702787234337245'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/4201702787234337245'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/processing-json-records-with-hadoop-and.html' title='Processing JSON Records With Hadoop and Wukong'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>6</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-8822713647218231600</id><published>2011-01-15T16:06:00.000-08:00</published><updated>2011-01-15T16:17:55.549-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='rake'/><category scheme='http://www.blogger.com/atom/ns#' term='workflow'/><category scheme='http://www.blogger.com/atom/ns#' term='swineherd'/><category scheme='http://www.blogger.com/atom/ns#' term='rstats'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='R'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Swineherd</title><content type='html'>&lt;a href="https://github.com/Ganglion/swineherd"&gt;Swineherd&lt;/a&gt; is a useful tool for combining together multiple pig scripts, wukong scripts, and even R scripts into a workflow managed by rake. Here though I'd like to save the actual workflow part for a later post and just illustrate it's uniform interface to these scripts by showing how to launch a wukong script:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rake'&lt;br /&gt;require 'swineherd'&lt;br /&gt;&lt;br /&gt;task :wukong_job do&lt;br /&gt;  script = WukongScript.new('/path/to/wukong_script')&lt;br /&gt;  script.options = {:some_option =&gt; "123", :another_option =&gt; "foobar"}&lt;br /&gt;  script.input &lt;&lt; '/path/to/input'&lt;br /&gt;  script.output &lt;&lt; '/path/to/output'&lt;br /&gt;  script.run&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;You can save this into a file called "Rakefile" and run it by saying:&lt;pre&gt;rake wukong_job&lt;/pre&gt;&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-8822713647218231600?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/8822713647218231600/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/swineherd.html#comment-form' title='1 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8822713647218231600'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8822713647218231600'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/swineherd.html' title='Swineherd'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>1</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-6608068340822679077</id><published>2011-01-15T15:21:00.000-08:00</published><updated>2011-01-15T16:18:23.985-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='command-line'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Wukong's Hadoop Convenience Utilities</title><content type='html'>Wukong comes with a number of convenience command-line utilities for working with the hdfs as well as a few commands for basic hadoop streaming. All of them can be found in wukong's bin directory. Enumerating a few:&lt;br /&gt;&lt;br /&gt;* hdp-put&lt;br /&gt;* hdp-ls&lt;br /&gt;* hdp-mkdir&lt;br /&gt;* hdp-rm&lt;br /&gt;* hdp-catd&lt;br /&gt;* hdp-stream&lt;br /&gt;* hdp-steam-flat&lt;br /&gt;* and more...&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;HDFS utilities&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;These are just wrappers around the hadoop fs utility to cut down on the amount of typing:&lt;br /&gt;&lt;table border="1" cellpadding="1"&gt;&lt;tr&gt;&lt;th&gt;Hadoop fs utility&lt;/th&gt;&lt;th&gt;Wukong Convenience Command&lt;/th&gt;&lt;/tr&gt;&lt;tr&gt;&lt;td&gt;hadoop fs -put&lt;/td&gt;&lt;td&gt;hdp-put&lt;/td&gt;&lt;/tr&gt;&lt;tr&gt;&lt;td&gt;hadoop fs -ls&lt;/td&gt;&lt;td&gt;hdp-ls&lt;/td&gt;&lt;/tr&gt;&lt;tr&gt;&lt;td&gt;hadoop fs -mkdir&lt;/td&gt;&lt;td&gt;hdp-mkdir&lt;/td&gt;&lt;/tr&gt;&lt;tr&gt;&lt;td&gt;hadoop fs -rm&lt;/td&gt; &lt;td&gt;hdp-rm&lt;/td&gt;&lt;/tr&gt;&lt;/table&gt;&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;hdp-catd&lt;/h3&gt;&lt;br /&gt;hdp-catd will take an arbitrary hdfs directory and cat it's contents. It ignores those files that start with a "_" character. This means we can cat a whole directory of those awful part-xxxxx files.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;hdp-stream&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;hdp-stream allows you to run a generic streaming task without all the typing. You almost always only need to specify input, output, num keys for partition, num sort key fields, number of reducers, and what scripts to use as the mapper and reducer. Here's an example of running a uniq:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;hdp-stream /path/to/input /path/to/output /bin/cat /usr/bin/uniq 2 3 -Dmapred.reduce.tasks=10&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;will launch a streaming job using 2 fields for partition and 3 fields for sort keys and 10 reduce tasks. See http://hadoop.apache.org/common/docs/r0.20.0/mapred-default.html for other options you can pass in with the "-D" flag.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;hdp-stream-flat&lt;/h3&gt;&lt;br /&gt;&lt;br /&gt;There's one other extremely useful case when you don't care to specify anything about partitioners because you either aren't running a reduce or don't care how your data is sent to individual reducers. In this case hdp-stream-flat is very useful. Here's how cut off the first two fields of a large input file:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;hdp-stream-flat /path/to/input /path/to/output "/usr/bin/cut -f1,2" "/bin/cat" -Dmapred.reduce.tasks=0&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;see wukong/bin for more useful command line utilities.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-6608068340822679077?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/6608068340822679077/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/wukongs-hadoop-convenience-utilities.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6608068340822679077'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/6608068340822679077'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/wukongs-hadoop-convenience-utilities.html' title='Wukong&apos;s Hadoop Convenience Utilities'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-8369370451189602275</id><published>2011-01-15T09:55:00.000-08:00</published><updated>2011-01-15T10:41:12.989-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='cat'/><category scheme='http://www.blogger.com/atom/ns#' term='command-line'/><category scheme='http://www.blogger.com/atom/ns#' term='examples'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='script'/><category scheme='http://www.blogger.com/atom/ns#' term='hacking'/><title type='text'>Wukong, Bringing Ruby to Hadoop</title><content type='html'>&lt;a href="http://github.com/infochimps/wukong"&gt;Wukong&lt;/a&gt; is hands down the simplest (and probably the most fun) tool to use with hadoop. It especially excels at the following use case:&lt;br /&gt;&lt;br /&gt;You've got a huge amount of data (let that be whatever size you think is huge). You want to perform a simple operation on each record. For example, parsing out fields with a regular expression, adding two fields together, stuffing those records into a data store, etc etc. These are called map only jobs. They do NOT require a reduce. Can you imagine writing a java map reduce program to add two fields together? Wukong gives you all the power of ruby backed by all the power (and parallelism) of hadoop streaming. Before we get into examples, and there will be plenty, let's make sure you've got wukong installed and running locally.&lt;br /&gt;&lt;br /&gt;&lt;h1&gt;Installing Wukong&lt;/h1&gt;&lt;br /&gt;&lt;br /&gt;First and foremost you've got to have ruby installed and running on your machine. Most of the time you already have it. Try checking the version in a terminal:&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: ruby --version&lt;br /&gt;ruby 1.8.7 (2010-01-10 patchlevel 249) [x86_64-linux]&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;If that fails then I bet google can help you get ruby installed on whatever os you happen to be using.&lt;br /&gt;&lt;br /&gt;Next is to make sure you've got rubygems installed&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: gem --version&lt;br /&gt;1.3.7&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Once again, google can help you get it installed if you don't have it.&lt;br /&gt;&lt;br /&gt;Wukong is a rubygem so we can just install it that way:&lt;pre class="brush: bash"&gt;&lt;br /&gt;sudo gem install wukong&lt;br /&gt;sudo gem install json&lt;br /&gt;sudo gem install configliere&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Notice we also installed a couple of other libraries to help us out (the json gem, the configliere gem, and the extlib gem). If at any time you get weird errors (LoadError: no such file to load -- somelibraryname) then you probably just need to gem install somelibraryname.&lt;br /&gt;&lt;br /&gt;&lt;h1&gt;An example&lt;/h1&gt;&lt;br /&gt;Moving on. You should be ready to test out running wukong locally now. Here's the most minimal working wukong script I can come up with that illustrates a map only wukong script:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: ruby"&gt;&lt;br /&gt;#!/usr/bin/env ruby&lt;br /&gt;&lt;br /&gt;require 'rubygems'&lt;br /&gt;require 'wukong'&lt;br /&gt;&lt;br /&gt;class LineMapper &lt; Wukong::Streamer::LineStreamer&lt;br /&gt;  def process line&lt;br /&gt;    yield line&lt;br /&gt;  end&lt;br /&gt;end&lt;br /&gt;&lt;br /&gt;Wukong::Script.new(LineMapper, nil).run&lt;br /&gt;&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Save that into a file called wukong_test.rb and run it with the following:&lt;pre class="brush: bash"&gt;cat wukong_test.rb | ./wukong_test.rb --map&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;If everything works as expected then you should see exactly the contents of your script dump onto your terminal. Lets examine what's actually going on here.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;Boiler plate ruby&lt;/h3&gt;&lt;br /&gt;First, we're letting the interpreter know we want to use ruby with the first line (somewhat obvious). Next, we're including the libraries we need.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;The guts&lt;/h3&gt;&lt;br /&gt;Then we define a class in ruby for doing our map job called LineMapper. This guy subclasses from the wukong LineStreamer class. All the LineStreamer class does is simply read records from &lt;span style="font-style:italic;"&gt;stdin&lt;/span&gt; and gives them as arguments to the LineMapper's process method. The process method then does nothing more than yield the line back to the LineStreamer which emits the line back to stdout.&lt;br /&gt;&lt;br /&gt;&lt;h3&gt;The runner&lt;/h3&gt;&lt;br /&gt;Finally, we have to let wukong know we intend to run our script. We create a new script object with LineMapper as the mapper class and &lt;span style="font-style:italic;"&gt;nil&lt;/span&gt; as the reducer class. &lt;br /&gt;&lt;br /&gt;More succinctly, we've written our own cat program. When we ran the above command we simply streamed our script, line by line, through the program. Try streaming some real data through the program and adding some more stuff to the process method. Perhaps parsing the line with a regular expression and yielding numbers? Yielding words? Yielding characters? The choice is yours. Have fun with it.&lt;br /&gt;&lt;br /&gt;Meatier examples to come.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-8369370451189602275?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/8369370451189602275/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/wukong-bringing-ruby-to-hadoop.html#comment-form' title='1 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8369370451189602275'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/8369370451189602275'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/wukong-bringing-ruby-to-hadoop.html' title='Wukong, Bringing Ruby to Hadoop'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>1</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-2481525674363363704</id><published>2011-01-15T09:32:00.000-08:00</published><updated>2011-01-15T09:48:10.529-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='map-reduce'/><category scheme='http://www.blogger.com/atom/ns#' term='java'/><category scheme='http://www.blogger.com/atom/ns#' term='pig'/><category scheme='http://www.blogger.com/atom/ns#' term='examples'/><category scheme='http://www.blogger.com/atom/ns#' term='wukong'/><category scheme='http://www.blogger.com/atom/ns#' term='hadoop'/><title type='text'>Real Deal Concrete Hadoop Examples</title><content type='html'>If you think you have to be a java programmer to use Hadoop then &lt;span style="font-style:italic;"&gt;you've been lied to&lt;/span&gt;. Hadoop is not hard. What makes learning hadoop (or more correctly, map reduce) tedious is the lack of concrete and useful examples. Word counts are f*ing &lt;b&gt;boring&lt;/b&gt;. The next few posts will overview two of the most useful higher level abstractions on top of hadoop (Pig and Wukong) with copious examples.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-2481525674363363704?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/2481525674363363704/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/real-deal-concrete-hadoop-examples.html#comment-form' title='1 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/2481525674363363704'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/2481525674363363704'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/real-deal-concrete-hadoop-examples.html' title='Real Deal Concrete Hadoop Examples'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>1</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-725414236206315077</id><published>2011-01-13T21:35:00.000-08:00</published><updated>2011-01-15T09:50:05.724-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='ruby'/><category scheme='http://www.blogger.com/atom/ns#' term='data'/><category scheme='http://www.blogger.com/atom/ns#' term='tsv'/><category scheme='http://www.blogger.com/atom/ns#' term='command-line'/><category scheme='http://www.blogger.com/atom/ns#' term='bash'/><category scheme='http://www.blogger.com/atom/ns#' term='json'/><category scheme='http://www.blogger.com/atom/ns#' term='hacking'/><title type='text'>Convert TSV to JSON command line</title><content type='html'>So you've got some tsv data:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: head foo.tsv&lt;br /&gt;148 0.05 49.0530378784848&lt;br /&gt;380 0.8 85.0345986160553&lt;br /&gt;496 0.05 33.665653373865&lt;br /&gt;612 0.15 58.1366745330187&lt;br /&gt;728 0.1 60.8615655373785&lt;br /&gt;844 0.3 69.4102235910563&lt;br /&gt;960 0.2 74.4530218791248&lt;br /&gt;1076 0.2 76.6129354825807&lt;br /&gt;1192 2.25 99.0050397984081&lt;br /&gt;1888 0.5 53.7328506859725&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;and you've got some field names (field_1,field_2,field_3). Try this:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;$: export FIELDS=field_1,field_2,field_3&lt;br /&gt;$: cat foo.tsv| ruby -rjson -ne 'puts ENV["FIELDS"].split(",").zip($_.strip.split("\t")).inject({}){|h,x| h[x[0]]=x[1];h}.to_json'&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;will give you something that looks like:&lt;br /&gt;&lt;br /&gt;&lt;pre class="brush: bash"&gt;&lt;br /&gt;{"field_1":"148","field_2":"0.05","field_3":"49.0530378784848"}&lt;br /&gt;{"field_1":"380","field_2":"0.8","field_3":"85.0345986160553"}&lt;br /&gt;{"field_1":"496","field_2":"0.05","field_3":"33.665653373865"}&lt;br /&gt;{"field_1":"612","field_2":"0.15","field_3":"58.1366745330187"}&lt;br /&gt;{"field_1":"728","field_2":"0.1","field_3":"60.8615655373785"}&lt;br /&gt;{"field_1":"844","field_2":"0.3","field_3":"69.4102235910563"}&lt;br /&gt;{"field_1":"960","field_2":"0.2","field_3":"74.4530218791248"}&lt;br /&gt;{"field_1":"1076","field_2":"0.2","field_3":"76.6129354825807"}&lt;br /&gt;{"field_1":"1192","field_2":"2.25","field_3":"99.0050397984081"}&lt;br /&gt;{"field_1":"1888","field_2":"0.5","field_3":"53.7328506859725"}&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Hurray.&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-725414236206315077?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/725414236206315077/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/convert-tsv-to-json-command-line.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/725414236206315077'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/725414236206315077'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/convert-tsv-to-json-command-line.html' title='Convert TSV to JSON command line'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-2865586709024208742</id><published>2011-01-13T21:11:00.000-08:00</published><updated>2011-01-15T09:49:17.465-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='linux'/><category scheme='http://www.blogger.com/atom/ns#' term='unix'/><category scheme='http://www.blogger.com/atom/ns#' term='plot'/><category scheme='http://www.blogger.com/atom/ns#' term='graph'/><category scheme='http://www.blogger.com/atom/ns#' term='rstats'/><category scheme='http://www.blogger.com/atom/ns#' term='R'/><category scheme='http://www.blogger.com/atom/ns#' term='fifo'/><title type='text'>Plot a FIFO in R</title><content type='html'>Recently discovered a really simple way to plot a fifo in rstats. Here's a simple example of plotting the output of your ifstat program. From one terminal do:&lt;br /&gt;&lt;br /&gt;&lt;pre&gt;&lt;br /&gt;mkfifo ifstat_fifo&lt;br /&gt;ifstat -n &gt; ifstat_fifo&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;Then, in &lt;b&gt;another&lt;/b&gt; terminal, open an R shell and do the following:&lt;br /&gt;&lt;br /&gt;&lt;pre&gt;&lt;br /&gt;# Plot the most recent 100 seconds of inbound network traffic&lt;br /&gt;&gt; while(T){&lt;br /&gt;  d &lt;- read.table(fifo("ifstat_fifo",open="read"))&lt;br /&gt;  x &lt;- rbind(x,d)&lt;br /&gt;  x &lt;-tail(x,100)&lt;br /&gt;  plot(x$V1,type='l')&lt;br /&gt;  Sys.sleep(1)&lt;br /&gt;}&lt;br /&gt;&lt;/pre&gt;&lt;br /&gt;&lt;br /&gt;You may have to run it a couple times while the fifo fills with data. And here's what that looks like:&lt;br /&gt;&lt;br /&gt;&lt;a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="http://1.bp.blogspot.com/_rbU0qmUQfs8/TS_f-igdZjI/AAAAAAAAAK4/LhxXHC8GUUs/s1600/ifstat_out.png"&gt;&lt;img style="display:block; margin:0px auto 10px; text-align:center;cursor:pointer; cursor:hand;width: 320px; height: 320px;" src="http://1.bp.blogspot.com/_rbU0qmUQfs8/TS_f-igdZjI/AAAAAAAAAK4/LhxXHC8GUUs/s320/ifstat_out.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5561910330333685298" /&gt;&lt;/a&gt;&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-2865586709024208742?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/2865586709024208742/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/plot-fifo-in-r.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/2865586709024208742'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/2865586709024208742'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/plot-fifo-in-r.html' title='Plot a FIFO in R'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><media:thumbnail xmlns:media='http://search.yahoo.com/mrss/' url='http://1.bp.blogspot.com/_rbU0qmUQfs8/TS_f-igdZjI/AAAAAAAAAK4/LhxXHC8GUUs/s72-c/ifstat_out.png' height='72' width='72'/><thr:total>0</thr:total></entry><entry><id>tag:blogger.com,1999:blog-2875432366090970056.post-4153741792799777096</id><published>2011-01-13T20:50:00.000-08:00</published><updated>2011-01-15T09:48:45.634-08:00</updated><category scheme='http://www.blogger.com/atom/ns#' term='java'/><category scheme='http://www.blogger.com/atom/ns#' term='java_home'/><category scheme='http://www.blogger.com/atom/ns#' term='mac os X'/><category scheme='http://www.blogger.com/atom/ns#' term='mac'/><title type='text'>JAVA_HOME on mac os X</title><content type='html'>As opposed to searching for and keeping in mind the JAVA_HOME environment variable on a mac there's a simple trick to remember:&lt;br /&gt;&lt;br /&gt;&lt;pre&gt;&lt;br /&gt;export JAVA_HOME=`/usr/libexec/java_home`&lt;br /&gt;&lt;/pre&gt;&lt;div class="blogger-post-footer"&gt;&lt;img width='1' height='1' src='https://blogger.googleusercontent.com/tracker/2875432366090970056-4153741792799777096?l=thedatachef.blogspot.com' alt='' /&gt;&lt;/div&gt;</content><link rel='replies' type='application/atom+xml' href='http://thedatachef.blogspot.com/feeds/4153741792799777096/comments/default' title='Post Comments'/><link rel='replies' type='text/html' href='http://thedatachef.blogspot.com/2011/01/javahome-on-mac-os-x.html#comment-form' title='0 Comments'/><link rel='edit' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/4153741792799777096'/><link rel='self' type='application/atom+xml' href='http://www.blogger.com/feeds/2875432366090970056/posts/default/4153741792799777096'/><link rel='alternate' type='text/html' href='http://thedatachef.blogspot.com/2011/01/javahome-on-mac-os-x.html' title='JAVA_HOME on mac os X'/><author><name>thedatachef</name><uri>http://www.blogger.com/profile/06156691310036989913</uri><email>noreply@blogger.com</email><gd:image rel='http://schemas.google.com/g/2005#thumbnail' width='32' height='24' src='http://1.bp.blogspot.com/_rbU0qmUQfs8/TTHe4XdEBtI/AAAAAAAAALA/dd-6zlZfQ2I/S220/gee_whiz.jpg'/></author><thr:total>0</thr:total></entry></feed>
