## Wednesday, October 26, 2011

### Nearest Neighbors With Apache Pig and Jruby

Since it's been such a long time since I last posted I thought I'd make this one a bit longer. It really is a condensing of a lot of things I've been working with and thinking about over the past few months.

## Nearest Neighbors

The nearest neighbors problem (also known as the post-office problem) is this: Given a point X in some metric space M, assign to it the nearest neighboring point S. In other words, given a residence, assign to it the nearest post office. The K-nearest neighbors problem, which this post addresses, is just a slight generalization of that problem. Instead of just one neighbor we are looking for K neighbors.

## Problem

So, we're going to use the geonames data. This is a set of nearly 8 million geo points with names, coordinates, and a bunch of other good stuff, from around the world. We would like to find, for a given point in the geonames set, the 5 nearest points (also in geonames) that are nearest to it. Should be pretty simple yeah?

$: wget http://download.geonames.org/export/dump/allCountries.zip ### Prepare data Since the geonames data set comes as a nice tab-separated-values (.tsv) file already it's just a matter of unzipping the package and placing it on your hdfs (you do have one of those don't you?). Do: $: unzip allCountries.zip$: hadoop fs -put allCountries.txt . to unzip the package and place the tsv file into your home directory on the hadoop distributed file system. ### Schema Oh, and by the way, before we forget, the data from geonames has this pig schema:  geonameid: int, name: chararray, asciiname: chararray, alternatenames: chararray, latitude: double, longitude: double, feature_class: chararray, feature_code: chararray, country_code: chararray, cc2: chararray, admin1_code: chararray, admin2_code: chararray, admin3_code: chararray, admin4_code: chararray, population: long, elevation: int, gtopo30: int, timezone: chararray, modification_date: chararray ### The Algorithm Now that we have the data we can start to play with it and think about how to solve the problem at hand. Looking at the data (use something like 'head', 'cat', 'cut', etc) we see that there are really only three fields of interest in the data: (geonameid, longitude, and latitude). All the other fields are just nice metadata which we can attach later. Now, since we're going to be using Apache Pig to solve this problem we need to think a little bit about parallelism. One constraint is that at no time is any one point going to have access to the locations of all the other points. In other words, we will not be storing the full set of points in memory. Besides, it's 8 million points, that's kind of a lot for my poor little machine to handle. So it's clear (right?) that we're going to have to partition the space in some way. Then, within a partition of the space, we'll need to apply a local version of the nearest neighbors algorithm. That's it really. Map and reduce. Wait, but there's one problem. What happens if we don't find all 5 neighbors for a point in a single partition? Hmmm. Well, the answer is iteration. We'll choose a small partition size to begin with and gradually increase the partition size until either the partition size is too large or all the neighbors have been found. Got it? Recap: • (1) Partition the space • (2) Search for nearest neighbors in a single partition • (3) If all neighbors have been found, terminate; else increase partition size and repeat (1) and (2) ### Implementation For partitioning the space we're going to use Google quadkeys (http://msdn.microsoft.com/en-us/library/bb259689.aspx) since it's super easy to implement and it partitions the space nicely. This will be a java UDF for Pig that takes a (longitude, latitude, and zoom level) tuple and returns a string quadkey (the partition id). Here's the actual java code for that. Let's call it "GetQuadkey": package sounder.pig.geo;import java.io.IOException;import org.apache.pig.EvalFunc;import org.apache.pig.data.Tuple;/** See: http://msdn.microsoft.com/en-us/library/bb259689.aspx A Pig UDF to compute the quadkey string for a given (longitude, latitude, resolution) tuple. */public class GetQuadkey extends EvalFunc< String> { private static final int TILE_SIZE = 256; public String exec(Tuple input) throws IOException { if (input == null || input.size() < 3 || input.isNull(0) || input.isNull(1) || input.isNull(2)) return null; Double longitude = (Double)input.get(0); Double latitude = (Double)input.get(1); Integer resolution = (Integer)input.get(2); String quadKey = quadKey(longitude, latitude, resolution); return quadKey; } private static String quadKey(double longitude, double latitude, int resolution) { int[] pixels = pointToPixels(longitude, latitude, resolution); int[] tiles = pixelsToTiles(pixels[0], pixels[1]); return tilesToQuadKey(tiles[0], tiles[1], resolution); } /** Return the pixel X and Y coordinates for the given lat, lng, and resolution. */ private static int[] pointToPixels(double longitude, double latitude, int resolution) { double x = (longitude + 180) / 360; double sinLatitude = Math.sin(latitude * Math.PI / 180); double y = 0.5 - Math.log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * Math.PI); int mapSize = mapSize(resolution); int[] pixels = {(int) trim(x * mapSize + 0.5, 0, mapSize - 1), (int) trim(y * mapSize + 0.5, 0, mapSize - 1)}; return pixels; } /** Convert from pixel coordinates to tile coordinates. */ private static int[] pixelsToTiles(int pixelX, int pixelY) { int[] tiles = {pixelX / TILE_SIZE, pixelY / TILE_SIZE}; return tiles; } /** Finally, given tile coordinates and a resolution, returns the appropriate quadkey */ private static String tilesToQuadKey(int tileX, int tileY, int resolution) { StringBuilder quadKey = new StringBuilder(); for (int i = resolution; i > 0; i--) { char digit = '0'; int mask = 1 << (i - 1); if ((tileX & mask) != 0) { digit++; } if ((tileY & mask) != 0) { digit++; digit++; } quadKey.append(digit); } return quadKey.toString(); } /** Ensure input value is within minval and maxval */ private static double trim(double n, double minVal, double maxVal) { return Math.min(Math.max(n, minVal), maxVal); } /** Width of the map, in pixels, at the given resolution */ public static int mapSize(int resolution) { return TILE_SIZE << resolution; }} Next we need to have another java udf that operates on all the points in a partition. Let's call it "NearestNeighbors". Here's a naive implementation of that: package sounder.pig.geo.nearestneighbors;import java.io.IOException;import java.util.PriorityQueue;import java.util.Iterator;import java.util.Comparator;import org.apache.pig.backend.executionengine.ExecException;import org.apache.pig.EvalFunc;import org.apache.pig.data.Tuple;import org.apache.pig.data.TupleFactory;import org.apache.pig.data.BagFactory;import org.apache.pig.data.DataBag;public class NearestNeighbors extends EvalFunc< DataBag> { private static TupleFactory tupleFactory = TupleFactory.getInstance(); private static BagFactory bagFactory = BagFactory.getInstance(); public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1)) return null; Long k = (Long)input.get(0); DataBag points = (DataBag)input.get(1); // {(id,lng,lat,{(n1,n1_dist)...})} DataBag result = bagFactory.newDefaultBag(); for (Tuple pointA : points) { DataBag neighborsBag = (DataBag)pointA.get(3); if (neighborsBag.size() < k) { PriorityQueue< Tuple> neighbors = toDistanceSortedQueue(k.intValue(), neighborsBag); Double x1 = Math.toRadians((Double)pointA.get(1)); Double y1 = Math.toRadians((Double)pointA.get(2)); for (Tuple pointB : points) { if (pointA!=pointB) { Double x2 = Math.toRadians((Double)pointB.get(1)); Double y2 = Math.toRadians((Double)pointB.get(2)); Double distance = haversineDistance(x1,y1,x2,y2); // Add this point as a neighbor if pointA has no neighbors if (neighbors.size()==0) { Tuple newNeighbor = tupleFactory.newTuple(2); newNeighbor.set(0, pointB.get(0)); newNeighbor.set(1, distance); neighbors.add(newNeighbor); } Tuple furthestNeighbor = neighbors.peek(); Double neighborDist = (Double)furthestNeighbor.get(1); if (distance < neighborDist) { Tuple newNeighbor = tupleFactory.newTuple(2); newNeighbor.set(0, pointB.get(0)); newNeighbor.set(1, distance); if (neighbors.size() < k) { neighbors.add(newNeighbor); } else { neighbors.poll(); // remove farthest neighbors.add(newNeighbor); } } } } // Should now have a priorityqueue containing a sorted list of neighbors // create new result tuple and add to result bag Tuple newPointA = tupleFactory.newTuple(4); newPointA.set(0, pointA.get(0)); newPointA.set(1, pointA.get(1)); newPointA.set(2, pointA.get(2)); newPointA.set(3, fromQueue(neighbors)); result.add(newPointA); } else { result.add(pointA); } } return result; } // Ensure sorted by descending private PriorityQueue< Tuple> toDistanceSortedQueue(int k, DataBag bag) { PriorityQueue< Tuple> q = new PriorityQueue< Tuple>(k, new Comparator< Tuple>() { public int compare(Tuple t1, Tuple t2) { try { Double dist1 = (Double)t1.get(1); Double dist2 = (Double)t2.get(1); return dist2.compareTo(dist1); } catch (ExecException e) { throw new RuntimeException("Error comparing tuples", e); } }; }); for (Tuple tuple : bag) q.add(tuple); return q; } private DataBag fromQueue(PriorityQueue< Tuple> q) { DataBag bag = bagFactory.newDefaultBag(); for (Tuple tuple : q) bag.add(tuple); return bag; } private Double haversineDistance(Double x1, Double y1, Double x2, Double y2) { double a = Math.pow(Math.sin((x2-x1)/2), 2) + Math.cos(x1) * Math.cos(x2) * Math.pow(Math.sin((y2-y1)/2), 2); return (2 * Math.asin(Math.min(1, Math.sqrt(a)))); }} The details of the NearestNeighbors UDF aren't super important and it's mostly pretty clear what's going on. Just know that it operates on a bag of points as input and returns a bag of points as output that has the same schema. This is really important since we're going to be iterating. Then we're on to the Pig part, hurray! Since Pig doesn't have any built in support for iteration, I chose to use Jruby (because it's awesome) and pig's "PigServer" java class to do all the work. Here's what the jruby runner looks like (it's kind of a lot so don't get scared): #!/usr/bin/env jrubyrequire 'java'## You might consider changing this to point to where you have# pig installed...#jar = "/usr/lib/pig/pig-0.8.1-cdh3u1-core.jar"conf = "/etc/hadoop/conf"$CLASSPATH << confrequire jarimport org.apache.pig.ExecTypeimport org.apache.pig.PigServerimport org.apache.pig.FuncSpecclass NearestNeighbors  attr_accessor :points, :k, :min_zl, :runmode  #  # Create a new nearest neighbors instance  # for the given points, k neighbors to find,  # a optional minimum zl (1-21) and optional  # hadoop run mode (local or mapreduce)  #  def initialize points, k, min_zl=20, runmode='mapreduce'    @points  = points    @k       = k    @min_zl  = min_zl    @runmode = runmode  end  #  # Run the nearest neighbors algorithm  #  def run    start_pig_server    register_jars_and_functions    run_algorithm    stop_pig_server  end  #  # Actually runs all the pig queries for  # the algorithm. Stops if all neighbors  # have been found or if min_zl is reached  #  def run_algorithm    start_nearest_neighbors(points, k, 22)    if run_nearest_neighbors(k, 22)      21.downto(min_zl) do |zl|        iterate_nearest_neighbors(k, zl)        break unless run_nearest_neighbors(k,zl)      end    end  end  #  # Registers algorithm initialization queries  #  def start_nearest_neighbors(input, k, zl)    @pig.register_query(PigQueries.load_points(input))    @pig.register_query(PigQueries.generate_initial_quadkeys(zl))  end  #  # Registers algorithm iteration queries  #  def iterate_nearest_neighbors k, zl    @pig.register_query(PigQueries.load_prior_done(zl))    @pig.register_query(PigQueries.load_prior_not_done(zl))    @pig.register_query(PigQueries.union_priors(zl))    @pig.register_query(PigQueries.trim_quadkey(zl))  end  #  # Runs one iteration of the algorithm  #  def run_nearest_neighbors(k, zl)    @pig.register_query(PigQueries.group_by_quadkey(zl))    @pig.register_query(PigQueries.nearest_neighbors(k, zl))    @pig.register_query(PigQueries.split_results(k, zl))    if !@pig.exists_file("done#{zl}")      @pig.store("done#{zl}", "done#{zl}")      not_done = @pig.store("not_done#{zl}", "not_done#{zl}")      not_done.get_results.has_next?    else      true    end  end  #  # Start a new pig server with the specified run mode  #  def start_pig_server    @pig = PigServer.new(runmode)  end  #  # Stop the running pig server  #  def stop_pig_server    @pig.shutdown  end  #  # Register the jar that contains the nearest neighbors  # and quadkeys udfs and define functions for them.  #  def register_jars_and_functions    @pig.register_jar('../../../../udf/target/sounder-1.0-SNAPSHOT.jar')    @pig.register_function('GetQuadkey',       FuncSpec.new('sounder.pig.geo.GetQuadkey()'))    @pig.register_function('NearestNeighbors', FuncSpec.new('sounder.pig.geo.nearestneighbors.NearestNeighbors()'))  end  #  # A simple class to contain the pig queries  #  class PigQueries    #    # Load the geonames points. Obviously,    # this should be modified to accept a    # variable schema.    #    def self.load_points geonames      "points = LOAD '#{geonames}' AS (     geonameid:         int,     name:              chararray,     asciiname:         chararray,     alternatenames:    chararray,     latitude:          double,     longitude:         double,     feature_class:     chararray,     feature_code:      chararray,     country_code:      chararray,     cc2:               chararray,     admin1_code:       chararray,     admin2_code:       chararray,     admin3_code:       chararray,     admin4_code:       chararray,     population:        long,     elevation:         int,     gtopo30:           int,     timezone:          chararray,     modification_date: chararray   );"    end    #    # Query to generate quadkeys at the specified zoom level    #    def self.generate_initial_quadkeys(zl)      "projected#{zl} = FOREACH points GENERATE GetQuadkey(longitude, latitude, #{zl}) AS quadkey, geonameid, longitude, latitude, {};"    end    #    # Load previous iteration's done points    #    def self.load_prior_done(zl)      "prior_done#{zl+1} = LOAD 'done#{zl+1}/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );"    end    #    # Load previous iteration's not done points    #    def self.load_prior_not_done(zl)      "prior_not_done#{zl+1} = LOAD 'not_done#{zl+1}/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );"    end    #    # Union the previous iterations points that are done    # with the points that are not done    #    def self.union_priors zl      "prior_neighbors#{zl+1} = UNION prior_done#{zl+1}, prior_not_done#{zl+1};"    end    #    # Chop off one character of precision from the existing    # quadkey to go one zl down.    #    def self.trim_quadkey zl      "projected#{zl} = FOREACH prior_neighbors#{zl+1} GENERATE     SUBSTRING(quadkey, 0, #{zl}) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;"    end    #    # Group the points by quadkey    #    def self.group_by_quadkey zl      "grouped#{zl}  = FOREACH (GROUP projected#{zl} BY quadkey) GENERATE group AS quadkey, projected#{zl}.(geonameid, longitude, latitude, $4) AS points_bag;" end # # Run the nearest neighbors udf on all the points for # a given quadkey # def self.nearest_neighbors(k, zl) "nearest_neighbors#{zl} = FOREACH grouped#{zl} GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(#{k}l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );" end # # Split the results into done and not_done relations # The algorithm is done when 'not_done' contains # no more tuples. # def self.split_results(k, zl) "SPLIT nearest_neighbors#{zl} INTO done#{zl} IF COUNT(neighbors) >= #{k}l, not_done#{zl} IF COUNT(neighbors) < #{k}l;" end endendNearestNeighbors.new(ARGV[0], ARGV[1]).run Call this file "nearest_neighbors.rb". The idea here is that we register some basic pig queries to do the initialization of the algorithm and the iterations. These queries are run over and over until either the "not_done" relation contains no more elements or the minimum zoom level has been reached. Note that a small zoom level means a big partition of space. ## Run it! I think we're finally ready to run it. Let K=5 and the min zoom level (zl) be 10. Then just run: $: ./nearest_neighbors.rb allCountries.txt 5 10

To kick it off. The output will live in 'done10' (in your home directory on the hdfs) and all the ones that couldn't find their neighbors (poor guys) are left in 'not_done10'. Let's take a look:

$: hadoop fs -cat done10/part* | head 22321231 5874132 -164.73333 67.83333 {(5870713,0.0020072489956274512),(5864687,0.001833343068439346),(5879702,0.0017344751302650937),(5879702,0.0017344751302650937),(5866444,9.775849082653818E-4)} 133223322 2631726 -17.98263 66.52688 {(2631999,8.959503690090641E-4),(2629528,8.491922360779314E-4),(2629528,8.491922360779314E-4),(2630727,3.840018669838177E-4),(2630727,3.840018669838177E-4)} 133223322 2631999 -18.01884 66.56514 {(2631726,8.959503690090641E-4),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5)} 200103201 5874186 -165.39611 64.74333 {(5864454,0.001422335354026523),(5864454,0.001422335354026523),(5867287,0.0013175743301195593),(5867186,0.0010114669397588846),(5867287,0.0013175743301195593)} 200103201 5878614 -165.3 64.76667 {(5874186,0.0017231123142588567),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047)} 200103203 5875461 -165.55111 64.53889 {(5865814,0.0028283599772347947),(5874676,0.0025819291222640857),(5876108,0.001901914079309611),(5869354,0.0016504815389672197),(5869180,0.0025319553109125676)} 200103300 5861248 -164.27639 64.69278 {(5880635,9.627541402483858E-4),(5878642,8.535957131129946E-4),(5878642,8.535957131129946E-4),(5876626,6.598180173900259E-4),(5876626,6.598180173900259E-4)} 200103300 5876626 -164.27111 64.65389 {(5880635,8.246219806226404E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4)} 200233011 5870290 -170.3 57.21667 {(5867100,0.00113848360324883),(7117548,0.0011082333731440464),(7117548,0.0011082333731440464),(5865746,0.001071745830095263),(5865746,0.001071745830095263)} 200233123 5873595 -169.48056 56.57778 {(7275749,0.0010608526185899635),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4)} Hurray! ### Pig Code Let's just take a look at the pig code that actually got executed. points = LOAD 'allCountries.txt' AS ( geonameid: int, name: chararray, asciiname: chararray, alternatenames: chararray, latitude: double, longitude: double, feature_class: chararray, feature_code: chararray, country_code: chararray, cc2: chararray, admin1_code: chararray, admin2_code: chararray, admin3_code: chararray, admin4_code: chararray, population: long, elevation: int, gtopo30: int, timezone: chararray, modification_date: chararray );projected22 = FOREACH points GENERATE GetQuadkey(longitude, latitude, 22) AS quadkey, geonameid, longitude, latitude, {};grouped22 = FOREACH (GROUP projected22 BY quadkey) GENERATE group AS quadkey, projected22.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors22 = FOREACH grouped22 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors22 INTO done22 IF COUNT(neighbors) >= 5l, not_done22 IF COUNT(neighbors) < 5l;prior_done22 = LOAD 'done22/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_not_done22 = LOAD 'not_done22/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_neighbors22 = UNION prior_done22, prior_not_done22;projected21 = FOREACH prior_neighbors22 GENERATE     SUBSTRING(quadkey, 0, 21) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;grouped21  = FOREACH (GROUP projected21 BY quadkey) GENERATE group AS quadkey, projected21.(geonameid, longitude, latitude, $4) AS points_bag;nearest_neighbors21 = FOREACH grouped21 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );SPLIT nearest_neighbors21 INTO done21 IF COUNT(neighbors) >= 5l, not_done21 IF COUNT(neighbors) < 5l;prior_done21 = LOAD 'done21/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_not_done21 = LOAD 'not_done21/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_neighbors21 = UNION prior_done21, prior_not_done21;projected20 = FOREACH prior_neighbors21 GENERATE SUBSTRING(quadkey, 0, 20) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors;grouped20 = FOREACH (GROUP projected20 BY quadkey) GENERATE group AS quadkey, projected20.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors20 = FOREACH grouped20 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors20 INTO done20 IF COUNT(neighbors) >= 5l, not_done20 IF COUNT(neighbors) < 5l;prior_done20 = LOAD 'done20/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_not_done20 = LOAD 'not_done20/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_neighbors20 = UNION prior_done20, prior_not_done20;projected19 = FOREACH prior_neighbors20 GENERATE     SUBSTRING(quadkey, 0, 19) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;grouped19  = FOREACH (GROUP projected19 BY quadkey) GENERATE group AS quadkey, projected19.(geonameid, longitude, latitude, $4) AS points_bag;nearest_neighbors19 = FOREACH grouped19 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );SPLIT nearest_neighbors19 INTO done19 IF COUNT(neighbors) >= 5l, not_done19 IF COUNT(neighbors) < 5l;prior_done19 = LOAD 'done19/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_not_done19 = LOAD 'not_done19/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_neighbors19 = UNION prior_done19, prior_not_done19;projected18 = FOREACH prior_neighbors19 GENERATE SUBSTRING(quadkey, 0, 18) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors;grouped18 = FOREACH (GROUP projected18 BY quadkey) GENERATE group AS quadkey, projected18.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors18 = FOREACH grouped18 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors18 INTO done18 IF COUNT(neighbors) >= 5l, not_done18 IF COUNT(neighbors) < 5l;prior_done18 = LOAD 'done18/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_not_done18 = LOAD 'not_done18/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_neighbors18 = UNION prior_done18, prior_not_done18;projected17 = FOREACH prior_neighbors18 GENERATE     SUBSTRING(quadkey, 0, 17) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;grouped17  = FOREACH (GROUP projected17 BY quadkey) GENERATE group AS quadkey, projected17.(geonameid, longitude, latitude, $4) AS points_bag;nearest_neighbors17 = FOREACH grouped17 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );SPLIT nearest_neighbors17 INTO done17 IF COUNT(neighbors) >= 5l, not_done17 IF COUNT(neighbors) < 5l;prior_done17 = LOAD 'done17/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_not_done17 = LOAD 'not_done17/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_neighbors17 = UNION prior_done17, prior_not_done17;projected16 = FOREACH prior_neighbors17 GENERATE SUBSTRING(quadkey, 0, 16) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors;grouped16 = FOREACH (GROUP projected16 BY quadkey) GENERATE group AS quadkey, projected16.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors16 = FOREACH grouped16 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors16 INTO done16 IF COUNT(neighbors) >= 5l, not_done16 IF COUNT(neighbors) < 5l;prior_done16 = LOAD 'done16/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_not_done16 = LOAD 'not_done16/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_neighbors16 = UNION prior_done16, prior_not_done16;projected15 = FOREACH prior_neighbors16 GENERATE     SUBSTRING(quadkey, 0, 15) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;grouped15  = FOREACH (GROUP projected15 BY quadkey) GENERATE group AS quadkey, projected15.(geonameid, longitude, latitude, $4) AS points_bag;nearest_neighbors15 = FOREACH grouped15 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );SPLIT nearest_neighbors15 INTO done15 IF COUNT(neighbors) >= 5l, not_done15 IF COUNT(neighbors) < 5l;prior_done15 = LOAD 'done15/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_not_done15 = LOAD 'not_done15/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_neighbors15 = UNION prior_done15, prior_not_done15;projected14 = FOREACH prior_neighbors15 GENERATE SUBSTRING(quadkey, 0, 14) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors;grouped14 = FOREACH (GROUP projected14 BY quadkey) GENERATE group AS quadkey, projected14.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors14 = FOREACH grouped14 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors14 INTO done14 IF COUNT(neighbors) >= 5l, not_done14 IF COUNT(neighbors) < 5l;prior_done14 = LOAD 'done14/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_not_done14 = LOAD 'not_done14/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_neighbors14 = UNION prior_done14, prior_not_done14;projected13 = FOREACH prior_neighbors14 GENERATE     SUBSTRING(quadkey, 0, 13) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;grouped13  = FOREACH (GROUP projected13 BY quadkey) GENERATE group AS quadkey, projected13.(geonameid, longitude, latitude, $4) AS points_bag;nearest_neighbors13 = FOREACH grouped13 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );SPLIT nearest_neighbors13 INTO done13 IF COUNT(neighbors) >= 5l, not_done13 IF COUNT(neighbors) < 5l;prior_done13 = LOAD 'done13/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_not_done13 = LOAD 'not_done13/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_neighbors13 = UNION prior_done13, prior_not_done13;projected12 = FOREACH prior_neighbors13 GENERATE SUBSTRING(quadkey, 0, 12) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors;grouped12 = FOREACH (GROUP projected12 BY quadkey) GENERATE group AS quadkey, projected12.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors12 = FOREACH grouped12 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors12 INTO done12 IF COUNT(neighbors) >= 5l, not_done12 IF COUNT(neighbors) < 5l;prior_done12 = LOAD 'done12/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_not_done12 = LOAD 'not_done12/part*' AS (     quadkey:   chararray,     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );prior_neighbors12 = UNION prior_done12, prior_not_done12;projected11 = FOREACH prior_neighbors12 GENERATE     SUBSTRING(quadkey, 0, 11) AS quadkey,     geonameid AS geonameid,     longitude AS longitude,     latitude  AS latitude,     neighbors AS neighbors;grouped11  = FOREACH (GROUP projected11 BY quadkey) GENERATE group AS quadkey, projected11.(geonameid, longitude, latitude, $4) AS points_bag;nearest_neighbors11 = FOREACH grouped11 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );SPLIT nearest_neighbors11 INTO done11 IF COUNT(neighbors) >= 5l, not_done11 IF COUNT(neighbors) < 5l;prior_done11 = LOAD 'done11/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_not_done11 = LOAD 'not_done11/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );prior_neighbors11 = UNION prior_done11, prior_not_done11;projected10 = FOREACH prior_neighbors11 GENERATE SUBSTRING(quadkey, 0, 10) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors;grouped10 = FOREACH (GROUP projected10 BY quadkey) GENERATE group AS quadkey, projected10.(geonameid, longitude, latitude,$4) AS points_bag;nearest_neighbors10 = FOREACH grouped10 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (     geonameid: int,     longitude: double,     latitude:  double,     neighbors: bag {t:tuple(neighbor_id:int, distance:double)}   );SPLIT nearest_neighbors10 INTO done10 IF COUNT(neighbors) >= 5l, not_done10 IF COUNT(neighbors) < 5l;

So you can see now why, with iteration, it's a good idea to generate as much code as possible.

And we're done :)

All the code for this is on github in the Sounder repo (along with tons of other Apache Pig examples).

## Tuesday, May 3, 2011

### Structural Similarity With Apache Pig

A while ago I posted this about computing the jaccard similarity (otherwise known as the structural similarity) of nodes in a network graph. Looking back on it over the past few days I realize there are some areas for serious improvement. Actually, it's just completely wrong. The approach is broken. So, I'm going to revisit the algorithm again, in more detail, and write a vastly improved Pig script for computing the structural similarity of vertices in a network graph.

## The graph

Of course, before you can do a whole lot of anything, you've got to have a graph to work with. To keep things dead simple (and breaking from what I normally do) I'm going to draw an arbitrary graph (and by draw I mean draw). Here it is:

This can be represented as a tab-separated-values set of adjacency pairs in a file called 'graph.tsv':

$: cat graph.tsvA CB CC DC EE FC HG HC GD EA GB G I don't think it gets any simpler. ## The Measure Now, the idea here is to compute the similarity between nodes, eg similarity(A,B). There's already a well defined measure for this called the jaccard similarity. The key take away is to notice that: ## The Pig This can be broken into a set of Pig operations quite easily actually: ### load data Remember, I saved the data into a file called 'graph.tsv'. So: edges = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);edges_dup = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray); It is necessary, but still a hack, to load the data twice at the moment. This is because self-intersections (which I'll be doing in a moment) don't work with Pig 0.8 at the moment. ### augment with sizes Now I need the 'size' of each of the sets. In this case the 'sets' are the list of nodes each node links to. So, all I really have to do is calculate the number of outgoing links: grouped_edges = GROUP edges BY v1;aug_edges = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;grouped_dups = GROUP edges_dup BY v1;aug_dups = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out; Again, if self-intersections worked, the operation of counting the outgoing links would only have to happen once. Now I've got a handle on the |A| and |B| parts of the equation above. ### intersection Next I'm going to compute the intersection using a Pig join: edges_joined = JOIN aug_edges BY v2, aug_dups BY v2;intersection = FOREACH edges_joined { -- -- results in: -- (X, Y, |X| + |Y|) -- added_size = aug_edges::v1_out + aug_dups::v1_out; GENERATE aug_edges::v1 AS v1, aug_dups::v1 AS v2, added_size AS added_size ; }; Notice I'm adding the individual set sizes. This is to come up the |A| + |B| portion of the denominator in the jaccard index. ### intersection sizes In order to compute the size of the intersection I've got to use a Pig GROUP and collect all the elements that matched on the JOIN: intersect_grp = GROUP intersection BY (v1, v2);intersect_sizes = FOREACH intersect_grp { -- -- results in: -- (X, Y, |X /\ Y|, |X| + |Y|) -- intersection_size = (double)COUNT(intersection); GENERATE FLATTEN(group) AS (v1, v2), intersection_size AS intersection_size, MAX(intersection.added_size) AS added_size -- hack, we only need this one time ; }; There's a hack in there. The reason is that 'intersection.added_size' is a Pig data bag containing some number of identical tuples (equal to the intersection size). Each of these tuples is the 'added_size' from the previous step. Using MAX is just a convenient way of pulling out only one of them. ### similarity And finally, I have all the pieces in place to compute the similarities: similarities = FOREACH intersect_sizes { -- -- results in: -- (X, Y, |X /\ Y|/|X U Y|) -- similarity = (double)intersection_size/((double)added_size-(double)intersection_size); GENERATE v1 AS v1, v2 AS v2, similarity AS similarity ; }; That'll do it. Here's the full script for completeness: edges = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);edges_dup = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);---- Augment the edges with the sizes of their outgoing adjacency lists. Note that-- if a self join was possible we would only have to do this once.--grouped_edges = GROUP edges BY v1;aug_edges = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;grouped_dups = GROUP edges_dup BY v1;aug_dups = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;---- Compute the sizes of the intersections of outgoing adjacency lists--edges_joined = JOIN aug_edges BY v2, aug_dups BY v2;intersection = FOREACH edges_joined { -- -- results in: -- (X, Y, |X| + |Y|) -- added_size = aug_edges::v1_out + aug_dups::v1_out; GENERATE aug_edges::v1 AS v1, aug_dups::v1 AS v2, added_size AS added_size ; };intersect_grp = GROUP intersection BY (v1, v2);intersect_sizes = FOREACH intersect_grp { -- -- results in: -- (X, Y, |X /\ Y|, |X| + |Y|) -- intersection_size = (double)COUNT(intersection); GENERATE FLATTEN(group) AS (v1, v2), intersection_size AS intersection_size, MAX(intersection.added_size) AS added_size -- hack, we only need this one time ; };similarities = FOREACH intersect_sizes { -- -- results in: -- (X, Y, |X /\ Y|/|X U Y|) -- similarity = (double)intersection_size/((double)added_size-(double)intersection_size); GENERATE v1 AS v1, v2 AS v2, similarity AS similarity ; };DUMP similarities; ## Results Run it: $: pig -x local jaccard.pig...snip...(A,A,1.0)(A,B,1.0)(A,C,0.2)(B,A,1.0)(B,B,1.0)(B,C,0.2)(C,A,0.2)(C,B,0.2)(C,C,1.0)(C,D,0.25)(C,G,0.25)(D,C,0.25)(D,D,1.0)(E,E,1.0)(G,C,0.25)(G,G,1.0)

And it's done! Hurray. Now, go make a recommender system or something.

## Tuesday, April 26, 2011

### A Lucene Text Tokenization UDF for Apache Pig

As much as I loathe to admit it, sometimes java is called for. One of those times is tokenizing raw text. You'll notice in the post about tfidf how I used a Wukong script, written in ruby, to accomplish the task of tokenizing a large text corpus with Hadoop and Pig. There are a couple of problems with this:

1. Ruby is slow at this.

2. All the gem dependencies (wukong itself, extlib, etc) must exist on all the machines in the cluster and be available in the RUBYLIB (yet another environment variable to manage).

There is a better way.

## A Pig UDF

Pig UDFs (User Defined Functions) come in a variety of flavors. The simplest type is the EvalFunc whose function 'exec()' essentially acts as the Wukong 'process()' method or the java hadoop Mapper's 'map()' function. Here we're going to write an EvalFunc that takes a raw text string as input and outputs a pig DataBag. Each Tuple in the DataBag will be a single token. Here's what it looks like as a whole:

import java.io.IOException;import java.io.StringReader;import java.util.Iterator;import org.apache.pig.EvalFunc;import org.apache.pig.data.Tuple;import org.apache.pig.data.TupleFactory;import org.apache.pig.data.DataBag;import org.apache.pig.data.BagFactory;import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;import org.apache.lucene.util.Version;import org.apache.lucene.analysis.Token;import org.apache.lucene.analysis.TokenStream;import org.apache.lucene.analysis.standard.StandardAnalyzer;import org.apache.lucene.analysis.standard.StandardTokenizer;public class TokenizeText extends EvalFunc {    private static TupleFactory tupleFactory = TupleFactory.getInstance();    private static BagFactory bagFactory = BagFactory.getInstance();    private static String NOFIELD = "";    private static StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_31);    public DataBag exec(Tuple input) throws IOException {        if (input == null || input.size() < 1 || input.isNull(0))            return null;        // Output bag        DataBag bagOfTokens = bagFactory.newDefaultBag();                        StringReader textInput = new StringReader(input.get(0).toString());        TokenStream stream = analyzer.tokenStream(NOFIELD, textInput);        CharTermAttribute termAttribute = stream.getAttribute(CharTermAttribute.class);        while (stream.incrementToken()) {            Tuple termText = tupleFactory.newTuple(termAttribute.toString());            bagOfTokens.add(termText);            termAttribute.setEmpty();        }        return bagOfTokens;    }}

There's absolutely nothing special going on here. Remember, the 'exec' function gets called on every Pig Tuple of input. bagOfTokens will be the Pig DataBag returned. First, the lucene library tokenizes the input string. Then all the tokens in the resulting stream are turned into Pig Tuples and added to the result DataBag. Finally the resulting DataBag is returned. A document is truly a bag of words.

## Example Pig Script

And here's an example script to use that UDF:

documents    = LOAD 'documents' AS (doc_id:chararray, text:chararray);tokenized    = FOREACH documents GENERATE doc_id AS doc_id, FLATTEN(TokenizeText(text)) AS (token:chararray);

And that's it. It's blazing fast text tokenization for Apache Pig.

Hurray.

## Saturday, April 23, 2011

### TF-IDF With Apache Pig

Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).

Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.

One of the cool things about tf-idf is how damn simple it is. Take a look at the wikipedia page. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (>= 0.8) installed, and the Wukong rubygem.

## Get Data

What would an example be without some data? Here's a link to the canonical 20 newsgroups data set that I've restructured slightly for your analysis pleasure.

## Tokenization

Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find here. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.

## Pig, Step By Step

The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:
DEFINE tokenize_docs ruby tokenize_documents.rb --id_field=0 --text_field=1 --map SHIP('tokenize_documents.rb');

This statement can be interpreted to mean:

"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."

Next up is to load and tokenize the data:

raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray); So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs. Then, we need to count the number of times each unique (doc_id, token) pair appears: doc_tokens = GROUP tokenized BY (doc_id, token);doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages; And, since we're after the token frequencies and not just the counts, we need to attach the document sizes: doc_usage_bag = GROUP doc_token_counts BY doc_id;doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE group AS doc_id, FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages), SUM(doc_token_counts.num_doc_tok_usages) AS doc_size ; Then the term frequencies are just: term_freqs = FOREACH doc_usage_bag_fg GENERATE doc_id AS doc_id, token AS token, ((double)num_doc_tok_usages / (double)doc_size) AS term_freq; ; For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term: term_usage_bag = GROUP term_freqs BY token;token_usages = FOREACH term_usage_bag GENERATE FLATTEN(term_freqs) AS (doc_id, token, term_freq), COUNT(term_freqs) AS num_docs_with_token ; Finally, we can compute the tf-idf weight: tfidf_all = FOREACH token_usages { idf = LOG((double)$NDOCS/(double)num_docs_with_token);              tf_idf = (double)term_freq*idf;                GENERATE                  doc_id AS doc_id,                  token  AS token,                  tf_idf AS tf_idf                ;             };

Where NDOCS is the total number of documents in the corpus.

## Pig, All Together Now

And here's the final script, all put together:

## Simple Question

A natural question to ask of such an awesome graph is for the similarity between two characters based on what comic books they've appeared in together. This is called the structural similarity since we're only using the structure of the graph and no other meta data (weights, etc). Note that this could also be applied the other direction to find the similarity between two comic books based on what characters they share.

## Wee bit of math

The structural similarity is nothing more than the jaccard similarity applied to nodes in a network graph. Here's the definition of that from wikipedia:

So basically all we've got to do is get a list of all the comic books that two characters, say character A and character B, have appeared in. These lists of comic books form two mathematical sets.

The numerator in that simple formula says to compute the intersection of A and B and then count how many elements are left. More plainly, that's just the number of comic books the two characters have in common.

The denominator tells us to compute the union of A and B and count how many elements are in the resulting set. That's just the number of unique comic books that A and B have ever been in, either at the same time or not.

## Pig

Here how we're going to say it using pig:

DEFINE jaccard_similarity ruby jaccard_similarity.rb --map SHIP('jaccard_similarity.rb');edges          = LOAD '/data/comics/marvel/labeled_edges.tsv' AS (character:chararray, comic:chararray);grouped        = GROUP edges BY character;with_sets      = FOREACH grouped GENERATE group AS character, FLATTEN(edges.comic) AS comic, edges.comic AS set;SPLIT with_sets INTO with_sets_dup IF ( 1 > 0 ), not_used if (1 < 0); -- hack hack hack, self join still doesn't workjoined         = JOIN with_sets BY comic, with_sets_dup BY comic;pairs          = FOREACH joined GENERATE                   with_sets::character     AS character_a,                   with_sets::set           AS character_a_set,                   with_sets_dup::character AS character_b,                   with_sets_dup::set       AS character_b_set                 ;similarity     = STREAM pairs THROUGH jaccard_similarity AS (character_a:chararray, character_b:chararray, similarity:float);STORE similarity INTO '/data/comics/marvel/character_similarity.tsv';

Notice we're doing a bit of funny business here. Writing the actual algorithm for the jaccard similarity between two small sets doesn't make much sense in Pig. Instead we've written a wukong script to do it for us (you could also write a Pig udf if you're a masochist).

The first thing we do here is use the DEFINE operator to tell pig that there's an external command we want to call, the alias for it, how to call it, and to SHIP the script we need to all nodes in the cluster.

Next we use the GROUP operator and then the FOREACH..GENERATE projection operator to get, for every character, a the list of comic books they've appeared in.

We also use the FLATTEN operator during the projection as well. The reason is so that we can use the JOIN operator to pull out (character,character) pairs that have at least one comic book in common. (Don't get scared about the gross looking SPLIT operator in there. Just ignore it. It's a hack to get around the fact that self-joins still don't quite work properly in pig. Pretend we're just joining 'with_sets' with itself.)

The last step is to STREAM our pairs through the simple wukong script. Here's what that looks like:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'require 'wukong/and_pig' # for special conversion methodsrequire 'set'## Takes two pig bags and computes their jaccard similarity## eg.## input:## (a,{(1),(2),(3)}, b, {(2),(9),(5)})## output:## (a, b, 0.2)#class JaccardSim < Wukong::Streamer::RecordStreamer  def process node_a, set_a, node_b, set_b    yield [node_a, node_b, jaccard(set_a, set_b)]  end  def jaccard bag_a, bag_b    common_elements = ((bag_a.from_pig_bag.to_set).intersection(bag_b.from_pig_bag.to_set)).size    total_elements  = ((bag_a.from_pig_bag.to_set).union(bag_b.from_pig_bag.to_set)).size    common_elements.to_f / total_elements.to_f  endendWukong::Script.new(JaccardSim, nil).run

Notice the nifty 'from_pig_bag' method that wukong has. All we're doing here is converting the two pig bags into ruby 'set' objects then doing the simple jaccard similarity calculation. (3 lines of code for the calculation itself, still want to do it in java?)

And that's it. Here's what it looks like after running:

"HUMAN TORCH/JOHNNY S" "GALACTUS/GALAN"        0.0514112900"HUMAN TORCH/JOHNNY S" "ROGUE /"               0.0371308030"HUMAN TORCH/JOHNNY S" "UATU"                  0.0481557360"LIVING LIGHTNING/MIG" "USAGENT DOPPELGANGER"  0.0322580640"LIVING LIGHTNING/MIG" "STRONG GUY/GUIDO CAR"  0.1052631600"LIVING LIGHTNING/MIG" "STORM/ORORO MUNROE S"  0.0209059230"LIVING LIGHTNING/MIG" "USAGENT/CAPTAIN JOHN"  0.1941747500"LIVING LIGHTNING/MIG" "WASP/JANET VAN DYNE "  0.0398089180"LIVING LIGHTNING/MIG" "THING/BENJAMIN J. GR"  0.0125120310"LIVING LIGHTNING/MIG" "WOLFSBANE/RAHNE SINC"  0.0209059230"LIVING LIGHTNING/MIG" "THUNDERSTRIKE/ERIC K"  0.1153846160"LIVING LIGHTNING/MIG" "WILD CHILD/KYLE GIBN"  0.0434782600

Sweet.

## Wednesday, January 26, 2011

### Graph Processing With Apache Pig

So, you're probably sick of seeing this airport data set by now (flight edges) but it's so awesome that I have to re-use it. Let's use Pig to do the same calculation as this post in a much more succinct way. We'll really get a feel for what Pig is better at than Wukong.

## Degree Distribution

Since the point here is to illustrate the difference between the wukong way and the pig way we're not going to introduce anything clever here. Here's the code for calculating the degree by month (both passengers and flights) for every domestic airport since 1990:

---- Caculates the monthly degree distributions for domestic airports from 1990 to 2009.---- Load data (boring part)flight_edges = LOAD '$FLIGHT_EDGES' AS (origin_code:chararray, destin_code:chararray, passengers:int, flights:int, month:int);-- For every (airport,month) pair get passengers, seats, and flights outedges_out = FOREACH flight_edges GENERATE origin_code AS airport, month AS month, passengers AS passengers_out, flights AS flights_out ;-- For every (airport,month) pair get passengers, seats, and flights inedges_in = FOREACH flight_edges GENERATE destin_code AS airport, month AS month, passengers AS passengers_in, flights AS flights_in ;-- group them together and sumgrouped_edges = COGROUP edges_in BY (airport,month), edges_out BY (airport,month);degree_dist = FOREACH grouped_edges { passenger_degree = SUM(edges_in.passengers_in) + SUM(edges_out.passengers_out); flights_degree = SUM(edges_in.flights_in) + SUM(edges_out.flights_out); GENERATE FLATTEN(group) AS (airport, month), passenger_degree AS passenger_degree, flights_degree AS flights_degree ; };STORE degree_dist INTO '$DEG_DIST';

So, here's what's going on:

• FOREACH..GENERATE: this is called a 'projection' in pig. Here we're really just cutting out the fields we don't want and rearranging our records. This is exactly the same as what we do in the wukong script, where we yielded two different types of records for the same input data in the map phase, only a lot more clear.

• COGROUP: here we're simply joining our two data relations together (edges in and edges out) by a common key (airport code,month) and aggregating the values for that key. This is exactly the same as what we do in the 'accumulate' part of the wukong script.

• FOREACH..GENERATE (once more): here we run through our grouped records and sum the flights and passengers. This is exactly the same as the 'finalize' part of the wukong script.

So, basically, we've done in 4 lines (not counting the LOAD and STORE or the prettification) of very clear and concise code what took us ~70 lines of ruby. Win.

Here's the wukong one again for reference:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class EdgeMapper < Wukong::Streamer::RecordStreamer  #  # Yield both ways so we can sum (passengers in + passengers out) and (flights  # in + flights out) individually in the reduce phase.  #  def process origin_code, destin_code, passengers, flights, month    yield [origin_code, month, "OUT", passengers, flights]    yield [destin_code, month, "IN", passengers, flights]  endendclass DegreeCalculator < Wukong::Streamer::AccumulatingReducer  #  # What are we going to use as a key internally?  #  def get_key airport, month, in_or_out, passengers, flights    [airport, month]  end  def start! airport, month, in_or_out, passengers, flights    @out_degree = {:passengers => 0, :flights => 0}    @in_degree  = {:passengers => 0, :flights => 0}  end  def accumulate airport, month, in_or_out, passengers, flights    case in_or_out    when "IN" then      @in_degree[:passengers] += passengers.to_i      @in_degree[:flights]    += flights.to_i    when "OUT" then      @out_degree[:passengers] += passengers.to_i      @out_degree[:flights]    += flights.to_i    end  end  #  # For every airport and month, calculate passenger and flight degrees  #  def finalize    # Passenger degrees (out, in, and total)    passengers_out   = @out_degree[:passengers]    passengers_in    = @in_degree[:passengers]    passengers_total = passengers_in + passengers_out    # Flight degrees (out, in, and total)    flights_out      = @out_degree[:flights]    flights_in       = @in_degree[:flights]    flights_total    = flights_in + flights_out    yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]  endend## Need to use 2 fields for partition so every record with the same airport and# month land on the same reducer#Wukong::Script.new(  EdgeMapper,  DegreeCalculator,  :partition_fields  => 2 # use two fields to partition records  ).run

## Plot Data

A very common workflow pattern with Hadoop is to use a tool like pig or wukong to process large scale data and generate some result data set. The last step in this process is (rather obviously) to summarize that data in some way. Here's a quick plot (using R and ggplot2) of the flights degree distribution after I further summarized by year:

That's funny...

Finally, here's the code for the plot:

# include the ggplot2 library for nice plotslibrary(ggplot2);# Read data in and take a subsetdegrees        <- read.table('yearly_degrees.tsv', header=FALSE, sep='\t', colClasses=c('character', 'character', 'numeric', 'numeric'));names(degrees) <- c('airport_code', 'year', 'passenger_degree', 'flights_degree');select_degrees <- subset(degrees, year=='2000' | year=='2001' | year=='2002' | year=='2009' | year=='1990');# Plotting with ggplot2pdf('passenger_degrees.pdf', 12, 6, pointsize=10);ggplot(select_degrees, aes(x=passenger_degree, fill=year)) + geom_density(colour='black', alpha=0.3) + scale_x_log10() + ylab('Probability') + xlab(expression(log[10] ('Passengers in + Passengers out'))) + opts(title='Passenger Degree Distribution')

Hurray.

## Sunday, January 23, 2011

### Bulk Indexing With ElasticSearch and Hadoop

At Infochimps we recently indexed over 2.5 billion documents for a total of 4TB total indexed size. This would not have been possible without ElasticSearch and the Hadoop bulk loader we wrote, wonderdog. I'll go into the technical details in a later post but for now here's how you can get started with ElasticSearch and Hadoop:

## Getting Started with ElasticSearch

The first thing is to actually install elasticsearch:

$: wget http://github.com/downloads/elasticsearch/elasticsearch/elasticsearch-0.14.2.zip$: sudo mv elasticsearch-0.14.2 /usr/local/share/$: sudo ln -s /usr/local/share/elasticsearch-0.14.2 /usr/local/share/elasticsearch Next you'll want to make sure there is an 'elasticsearch' user and that there are suitable data, work, and log directories that 'elasticsearch' owns: $: sudo useradd elasticsearch$: sudo mkdir -p /var/log/elasticsearch /var/run/elasticsearch/{data,work}$: sudo chown -R elasticsearch /var/{log,run}/elasticsearch

Then get wonderdog (you'll have to git clone it for now) and go ahead and copy the example configuration in wonderdog/config:

$: sudo mkdir -p /etc/elasticsearch$: sudo cp config/elasticsearch-example.yml /etc/elasticsearch/elasticsearch.yml$: sudo cp config/logging.yml /etc/elasticsearch/$: sudo cp config/elasticsearch.in.sh /etc/elasticsearch/

Make changes to 'elasticsearch.yml' such that it points to the correct data, work, and log directories. Also, you'll want to change the number of 'recovery_after_nodes' and 'expected_nodes' in elasticsearch.yml to however many nodes (machines) you actually expect to have in your cluster. You'll probably also want to do a quick once-over of elasticsearch.in.sh and make sure the jvm settings, etc are sane for your particular setup. Finally, to startup do:

sudo -u elasticsearch /usr/local/share/elasticsearch/bin/elasticsearch -Des.config=/etc/elasticsearch/elasticsearch.yml

You should now have a happily running (reasonably configured) elasticsearch data node.

## Index Some Data

Prerequisites:

• You have a working hadoop cluster

• Elasticsearch data nodes are installed and running on all your machines and they have discovered each other. See the elasticsearch documentation for details on making that actually work.

• You've installed the following rubygems: 'configliere' and 'json'

### Get Data

As an example lets index this UFO sightings data set from Infochimps here. (You should be familiar with this one by now...) It's mostly raw text and so it's a very reasonable thing to index. Once it's downloaded go ahead and throw it on the HDFS:
$: hadoop fs -mkdir /data/domestic/ufo$: hadoop fs -put chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv /data/domestic/ufo/

### Index Data

This is the easy part:

$: bin/wonderdog --rm --field_names=sighted_at,reported_at,location,shape,duration,description --id_field=-1 --index_name=ufo_sightings --object_type=ufo_sighting --es_config=/etc/elasticsearch/elasticsearch.yml /data/domestic/aliens/ufo_awesome.tsv /tmp/elasticsearch/aliens/out Flags: '--rm' - Remove output on the hdfs if it exists '--field_names' - A comma separated list of the field names in the tsv, in order '--id_field' - The field to use as the record id, -1 if the record has no inherent id '--index_name' - The index name to bulk load into '--object_type' - The type of objects we're indexing '--es_config' - Points to the elasticsearch config* *The elasticsearch config that the hadoop machines need must be on all the hadoop machines and have a 'hosts' entry listing the ips of all the elasticsearch data nodes (see wonderdog/config/elasticsearch-example.yml). This means we can run the hadoop job on a different cluster than the elasticsearch data nodes are running on. The other two arguments are the input and output paths. The output path in this case only gets written to if one or more index requests fail. This way you can re-run the job on only those records that didn't make it the first time. The indexing should go pretty quickly. Next is to refresh the index so we can actually query our newly indexed data. There's a tool in wonderdog's bin directory for that: $: bin/estool --host=hostname -i refresh_index

### Query Data

Once again, use estool
$: bin/estool --host=hostname -i --index_name=ufo_sightings --query_string="ufo" query Hurray. ## Saturday, January 22, 2011 ### JRuby and Hadoop, Notes From a Non-Java Programmer So I spent a fair deal of time playing with JRuby this weekend. Here's my notes/conclusions so far. Disclaimer: My experience with java overall is limited, so most of this might be obvious to a java ninja but maybe not to a ruby one... Goal: The whole point here is that it sure would be nice to only write a few lines of ruby code into a file called something like 'rb_mapreduce.rb' and run it by saying: ./rb_mapreduce.rb. No compiling, no awkward syntax. Pure ruby, plain and simple. Experiments/Notes: I created a 'WukongPlusMapper' that subclassed 'org.apache.hadoop.mapreduce.Mapper' and implemented the important methods, namely 'map'. Then I setup and launched a job from inside jruby using this jruby mapper ('WukongPlusMapper') as the map class. The job launched and ran just fine. But... Problems and Lessons Learned: • It is possible (in fact extremely easy) to setup and launch a Hadoop job with pure jruby • It is not possible, that I can tell so far, to use an uncompiled jruby class as either the mapper or the reducer for a Hadoop job. It doesn't throw an error (so long as you've subclassed a proper java mapper) but actually just uses the superclass's definition instead. I believe the reason is that each map task must have access to the full class definition for its mapper (only sensible) and has no idea what to do with my transient 'WukongPlusMapper' class. Obviously the same would apply to the reducer • It is possible to compile a jruby class ahead-of-time, stuff it into the job jar, and then launch the job with ordinary means. There are a couple somewhat obvious drawbacks with this method: • You've got to specify 'java_signatures' for each of your methods that are going to be called inside java • Messy logic+code for compiling thyself, stuffing thyself into a jar, shipping thyself with MR job. Might as well just write java at this point. radoop has some logic for doing that pretty well laid out. • It is possible to define and create an object in jruby that subclasses a java class or implements a java interface. Then you can simply overwrite the methods you want to overwrite. It's possible to pass instances of this class to a java runtime that only knows about the superclass and the subclass's methods (at least the ones that have the signatures defined in the superclass) will work just fine. Unfortunately, (and plainly obvious in hindsight) this does NOT work with Hadoop since these instances all show up in java as 'proxy classes' and are only accessible to the launching jvm • On another note there is the option of using the scripting engine which, as far as I can tell, is what both jruby-on-hadoop and radoop are using. Something of concern though is that neither of these two projects seem to have much traction. However, it may be that the scripting engine is the only way to reasonably make this work, at least 2 people vote yes ... So, implementation complexity aside, it looks like all one would have to do is come up with some way of making JRuby's in-memory class definitions available to all of the spawned mappers and reducers. Probably not something I want to delve into at the moment. Script engine it is. ## Friday, January 21, 2011 ### Pig, Bringing Simplicity to Hadoop In strong contrast to the seat-of-your-pants style of Wukong there is another high level language for Hadoop called Pig. See Apache Pig. ## Overview At the top level, here's what Pig gets you: • No java required. That is, use as little (zero) or as much (reams) of java code as you want. • No boilerplate code • Intuitive and easy to understand language (similar to SQL) with clean uniform syntax • Separation of high level algorithm and low level map-reduce jobs • Build your analysis as a set of operations acting on data • Most algorithms are less than 5, human readable, lines of Pig ## Get Pig Go here and download pig (version 0.8) from somewhere close to you. Unpack it and put it wherever you like. Then, type 'pig' at the command line and see what happens. It's very likely that it doesn't pick up your existing Hadoop configuration. To fix that set HADOOP_HOME to point to your hadoop installation and PIG_CLASSPATH to point to your hadoop configuration (probably /etc/hadoop/conf). Here's all that again: $: wget http://mirrors.axint.net/apache//pig/pig-0.8.0/pig-0.8.0.tar.gz$: tar -zxf pig-0.8.0.tar.gz$: sudo mv pig-0.8.0 /usr/local/share/$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig$: sudo ln -s /usr/local/share/pig/bin/pig /usr/local/bin/pig$: hash -r$: export HADOOP_HOME=/usr/lib/hadoop$: export PIG_CLASSPATH=/etc/hadoop/conf$: pig2011-01-21 09:56:32,486 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/jacob/pig_1295625392480.log2011-01-21 09:56:32,879 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master:80202011-01-21 09:56:33,402 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: master:54311grunt>

See how easy that was. Get it over with now.

## Firsties

People post the most interesting data on Infochimps. Let's get the first billion digits of pi here. Notice that it's arranged in groups of 10 digits with 10 groups per line. We're going to use pig to see how the digits are distributed.

This will allow us to visually (once we make a plot) spot any obvious deviation from a random series of numbers. That is, for a random series of numbers we'd expect each digit (0-9) to appear equally often. If this isn't true then we know we're dealing with a more complicated beast.

### Pre-Process

After catting the data file you'll notice the end of the line has the crufty details attached that makes this data more or less impossible to load into Pig as a table. Pig is terrible at data munging/parsing/cleaning. Thankfully we have wukong. Let's write a dead simple wukong script to fix the last field and create one digit per line:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class PiCleaner < Wukong::Streamer::LineStreamer  def process line    fields               = line.strip.split(' ', 10)    hundred_digit_string = [fields[0..8], fields[9][0..9]].join rescue ""    hundred_digit_string.each_char{|digit| yield digit}  endendWukong::Script.new(PiCleaner, nil).run

Let's go ahead and run that right away (this will take a few minutes depending on your rig, it'll be a billion lines of output...):
$: hdp-mkdir /data/math/pi/$: hdp-put pi-010.txt /data/math/pi/$: ./pi_clean.rb --run /data/math/pi/pi-010.txt /data/math/pi/first_billion_digits.tsvI, [2011-01-22T11:01:57.363704 #12489] INFO -- : Launching hadoop!I, [2011-01-22T11:01:57.363964 #12489] INFO -- : Running/usr/local/share/hadoop/bin/hadoop \ jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \ -D mapred.reduce.tasks=0 \ -D mapred.job.name='pi_clean.rb---/data/math/pi/pi-010.txt---/data/math/pi/first_billion_digits.tsv' \ -mapper '/usr/bin/ruby1.8 pi_clean.rb --map ' \ -reducer '' \ -input '/data/math/pi/pi-010.txt' \ -output '/data/math/pi/first_billion_digits.tsv' \ -file '/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb' \ -cmdenv 'RUBYLIB=~/.rubylib'11/01/22 11:01:59 INFO mapred.FileInputFormat: Total input paths to process : 111/01/22 11:01:59 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]11/01/22 11:01:59 INFO streaming.StreamJob: Running job: job_201012031305_025111/01/22 11:01:59 INFO streaming.StreamJob: To kill this job, run:11/01/22 11:01:59 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_025111/01/22 11:01:59 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_025111/01/22 11:02:00 INFO streaming.StreamJob: map 0% reduce 0%11/01/22 11:05:11 INFO streaming.StreamJob: map 100% reduce 100%11/01/22 11:05:11 INFO streaming.StreamJob: Job complete: job_201012031305_025111/01/22 11:05:11 INFO streaming.StreamJob: Output: /data/math/pi/first_billion_digits.tsvpackageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar4401930660028806042/] [] /tmp/streamjob3153669001520547.jar tmpDir=null Great. Now let's write some Pig: ### Analysis This is what Pig is awesome at. Remember that accumulating reducer in wukong? We're about to do the identical thing in two lines of no-nonsense pig: -- loaddigits = LOAD '$PI_DIGITS' AS (digit:int);groups = GROUP digits BY digit;counts = FOREACH groups GENERATE group AS digit, COUNT(digits) AS num_digits;-- storeSTORE counts INTO '$OUT'; All we're doing here is reading in the data (one digit per line), accumulating all the digits with the same 'digit', and counting them up. Save it into a file called 'pi.pig' and run with the following: $: pig -p PI_DIGITS=/data/math/pi/first_billion_digits.tsv -p OUT=/data/math/pi/digit_counts.tsv pi.pig

I'll skip the output since it's rather verbose. Now we can make a simple plot by hdp-catting our output data into a local file (it's super tiny by now) and plotting it with R:

$: hdp-catd /data/math/pi/digit_counts.tsv > digit_counts.tsv$: R> library(ggplot2)> digit_counts <- read.table('digit_counts.tsv', header=FALSE, sep="\t")> names(digit_counts) <- c('digit', 'count')> p <- ggplot(digit_counts, aes(x=digit)) + geom_histogram(aes(y = ..density.., weight = count, binwidth=1), colour='black', fill='grey', alpha=0.7)> p + scale_y_continuous(limits=c(0,0.5))

Will result in the following:

I'll leave it to you to make your own assumptions.

## Thursday, January 20, 2011

### Graph Processing With Wukong and Hadoop

As a last (for now) tutorial oriented post on Wukong, let's process a network graph.

## Get Data

This airport data (airport edges) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.

## Explore Data

We've got to actually look at the data before we can make any decisions about how to process it and what questions we'd like answered:

$: head data/flights_with_colnames.tsv | wu-lign origin_airport destin_airport passengers flights month MHK AMW 21 1 200810EUG RDM 41 22 199011EUG RDM 88 19 199012EUG RDM 11 4 199010MFR RDM 0 1 199002MFR RDM 11 1 199003MFR RDM 2 4 199001MFR RDM 7 1 199009MFR RDM 7 2 199011 So it's exactly what you'd expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics... ## Ask A Question A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph: 1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the 'in degree' and passengers out is (naturally) called the 'out degree'. 2. Flights Degree: For a given airport the number of flights in + the number of flights out. Let's write the question wukong style: #!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class EdgeMapper < Wukong::Streamer::RecordStreamer # # Yield both ways so we can sum (passengers in + passengers out) and (flights # in + flights out) individually in the reduce phase. # def process origin_code, destin_code, passengers, flights, month yield [origin_code, month, "OUT", passengers, flights] yield [destin_code, month, "IN", passengers, flights] endendclass DegreeCalculator < Wukong::Streamer::AccumulatingReducer # # What are we going to use as a key internally? # def get_key airport, month, in_or_out, passengers, flights [airport, month] end def start! airport, month, in_or_out, passengers, flights @out_degree = {:passengers => 0, :flights => 0} @in_degree = {:passengers => 0, :flights => 0} end def accumulate airport, month, in_or_out, passengers, flights case in_or_out when "IN" then @in_degree[:passengers] += passengers.to_i @in_degree[:flights] += flights.to_i when "OUT" then @out_degree[:passengers] += passengers.to_i @out_degree[:flights] += flights.to_i end end # # For every airport and month, calculate passenger and flight degrees # def finalize # Passenger degrees (out, in, and total) passengers_out = @out_degree[:passengers] passengers_in = @in_degree[:passengers] passengers_total = passengers_in + passengers_out # Flight degrees (out, in, and total) flights_out = @out_degree[:flights] flights_in = @in_degree[:flights] flights_total = flights_in + flights_out yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total] endend## Need to use 2 fields for partition so every record with the same airport and# month land on the same reducer#Wukong::Script.new( EdgeMapper, DegreeCalculator, :partition_fields => 2 # use two fields to partition records ).run Don't panic. There's a lot going on in this script so here's the breakdown (real gentle like): ### Mapper Here we're using wukong's RecordStreamer class which reads lines from$stdin and splits on tabs for us already. That's how we know exactly what arguments the process method gets.

Next, as is often the case with low level map-reduce, we've got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information ("OUT" or "IN") depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.

Finally, we've carefully rearranged our records such that (airport,month) is always the first two fields. We'll partition on this as the key. (We have to say that explicitly at the bottom of the script)

### Reducer

We've seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we've explicitly told it to use the airport and month as the key with the 'get_key' method.

* start! - Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let's all take a moment and think about how awful and unreadable that would be in java...)

* accumulate - Here we simply look at each record and decide which counters to increment depending on whether it's "OUT" or "IN".

* finalize - All we're doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the 'key' is just (airport,month).

\$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign 1B1 200906 1   1   2 1  1  2ABE 200705 0  83  83 0  3  3ABE 199206 0  31  31 0  1  1ABE 200708 0 904 904 0 20 20ABE 200307 0  91  91 0  2  2ABE 200703 0  36  36 0  1  1ABE 199902 0  84  84 0  1  1ABE 200611 0 753 753 0 18 18ABE 199209 0  99  99 0  1  1ABE 200702 0  54  54 0  1  1ABE 200407 0  98  98 0  1  1ABE 200705 0 647 647 0 15 15ABE 200306 0  27  27 0  1  1ABE 200703 0 473 473 0 11 11ABE 200309 0 150 150 0  1  1ABE 200702 0 313 313 0  8  8ABE 200103 0   0   0 0  1  1ABE 199807 0 105 105 0  1  1ABE 199907 0  91  91 0  1  1ABE 199501 0  50  50 0  1  1