Nearest Neighbors With Apache Pig and Jruby

Since it's been such a long time since I last posted I thought I'd make this one a bit longer. It really is a condensing of a lot of things I've been working with and thinking about over the past few months.

Nearest Neighbors

The nearest neighbors problem (also known as the post-office problem) is this: Given a point X in some metric space M, assign to it the nearest neighboring point S. In other words, given a residence, assign to it the nearest post office. The K-nearest neighbors problem, which this post addresses, is just a slight generalization of that problem. Instead of just one neighbor we are looking for K neighbors.


So, we're going to use the geonames data. This is a set of nearly 8 million geo points with names, coordinates, and a bunch of other good stuff, from around the world. We would like to find, for a given point in the geonames set, the 5 nearest points (also in geonames) that are nearest to it. Should be pretty simple yeah?

Get data

The geonames data set '' can be downloaded like so:

$: wget

Prepare data

Since the geonames data set comes as a nice tab-separated-values (.tsv) file already it's just a matter of unzipping the package and placing it on your hdfs (you do have one of those don't you?). Do:

$: unzip
$: hadoop fs -put allCountries.txt .

to unzip the package and place the tsv file into your home directory on the hadoop distributed file system.


Oh, and by the way, before we forget, the data from geonames has this pig schema:

geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray

The Algorithm

Now that we have the data we can start to play with it and think about how to solve the problem at hand. Looking at the data (use something like 'head', 'cat', 'cut', etc) we see that there are really only three fields of interest in the data: (geonameid, longitude, and latitude). All the other fields are just nice metadata which we can attach later.

Now, since we're going to be using Apache Pig to solve this problem we need to think a little bit about parallelism. One constraint is that at no time is any one point going to have access to the locations of all the other points. In other words, we will not be storing the full set of points in memory. Besides, it's 8 million points, that's kind of a lot for my poor little machine to handle.

So it's clear (right?) that we're going to have to partition the space in some way. Then, within a partition of the space, we'll need to apply a local version of the nearest neighbors algorithm. That's it really. Map and reduce. Wait, but there's one problem. What happens if we don't find all 5 neighbors for a point in a single partition? Hmmm. Well, the answer is iteration. We'll choose a small partition size to begin with and gradually increase the partition size until either the partition size is too large or all the neighbors have been found. Got it?


  • (1) Partition the space

  • (2) Search for nearest neighbors in a single partition

  • (3) If all neighbors have been found, terminate; else increase partition size and repeat (1) and (2)


For partitioning the space we're going to use Google quadkeys ( since it's super easy to implement and it partitions the space nicely. This will be a java UDF for Pig that takes a (longitude, latitude, and zoom level) tuple and returns a string quadkey (the partition id).

Here's the actual java code for that. Let's call it "GetQuadkey":

package sounder.pig.geo;

import org.apache.pig.EvalFunc;


A Pig UDF to compute the quadkey string for a given
(longitude, latitude, resolution) tuple.

public class GetQuadkey extends EvalFunc< String> {
private static final int TILE_SIZE = 256;

public String exec(Tuple input) throws IOException {
if (input == null || input.size() < 3 || input.isNull(0) || input.isNull(1) || input.isNull(2))
return null;

Double longitude = (Double)input.get(0);
Double latitude = (Double)input.get(1);
Integer resolution = (Integer)input.get(2);

String quadKey = quadKey(longitude, latitude, resolution);
return quadKey;

private static String quadKey(double longitude, double latitude, int resolution) {
int[] pixels = pointToPixels(longitude, latitude, resolution);
int[] tiles = pixelsToTiles(pixels[0], pixels[1]);
return tilesToQuadKey(tiles[0], tiles[1], resolution);

Return the pixel X and Y coordinates for the given lat, lng, and resolution.
private static int[] pointToPixels(double longitude, double latitude, int resolution) {
double x = (longitude + 180) / 360;
double sinLatitude = Math.sin(latitude * Math.PI / 180);
double y = 0.5 - Math.log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * Math.PI);

int mapSize = mapSize(resolution);
int[] pixels = {(int) trim(x * mapSize + 0.5, 0, mapSize - 1), (int) trim(y * mapSize + 0.5, 0, mapSize - 1)};
return pixels;

Convert from pixel coordinates to tile coordinates.
private static int[] pixelsToTiles(int pixelX, int pixelY) {
int[] tiles = {pixelX / TILE_SIZE, pixelY / TILE_SIZE};
return tiles;

Finally, given tile coordinates and a resolution, returns the appropriate quadkey
private static String tilesToQuadKey(int tileX, int tileY, int resolution) {
StringBuilder quadKey = new StringBuilder();
for (int i = resolution; i > 0; i--) {
char digit = '0';
int mask = 1 << (i - 1);
if ((tileX & mask) != 0) {
if ((tileY & mask) != 0) {
return quadKey.toString();

Ensure input value is within minval and maxval
private static double trim(double n, double minVal, double maxVal) {
return Math.min(Math.max(n, minVal), maxVal);

Width of the map, in pixels, at the given resolution
public static int mapSize(int resolution) {
return TILE_SIZE << resolution;

Next we need to have another java udf that operates on all the points in a partition. Let's call it "NearestNeighbors". Here's a naive implementation of that:

package sounder.pig.geo.nearestneighbors;

import java.util.PriorityQueue;
import java.util.Iterator;
import java.util.Comparator;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.EvalFunc;

public class NearestNeighbors extends EvalFunc< DataBag> {
private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
return null;

Long k = (Long)input.get(0);
DataBag points = (DataBag)input.get(1); // {(id,lng,lat,{(n1,n1_dist)...})}
DataBag result = bagFactory.newDefaultBag();

for (Tuple pointA : points) {
DataBag neighborsBag = (DataBag)pointA.get(3);
if (neighborsBag.size() < k) {
PriorityQueue< Tuple> neighbors = toDistanceSortedQueue(k.intValue(), neighborsBag);
Double x1 = Math.toRadians((Double)pointA.get(1));
Double y1 = Math.toRadians((Double)pointA.get(2));

for (Tuple pointB : points) {
if (pointA!=pointB) {
Double x2 = Math.toRadians((Double)pointB.get(1));
Double y2 = Math.toRadians((Double)pointB.get(2));
Double distance = haversineDistance(x1,y1,x2,y2);

// Add this point as a neighbor if pointA has no neighbors
if (neighbors.size()==0) {
Tuple newNeighbor = tupleFactory.newTuple(2);
newNeighbor.set(0, pointB.get(0));
newNeighbor.set(1, distance);

Tuple furthestNeighbor = neighbors.peek();
Double neighborDist = (Double)furthestNeighbor.get(1);
if (distance < neighborDist) {
Tuple newNeighbor = tupleFactory.newTuple(2);
newNeighbor.set(0, pointB.get(0));
newNeighbor.set(1, distance);

if (neighbors.size() < k) {
} else {
neighbors.poll(); // remove farthest
// Should now have a priorityqueue containing a sorted list of neighbors
// create new result tuple and add to result bag
Tuple newPointA = tupleFactory.newTuple(4);
newPointA.set(0, pointA.get(0));
newPointA.set(1, pointA.get(1));
newPointA.set(2, pointA.get(2));
newPointA.set(3, fromQueue(neighbors));
} else {
return result;

// Ensure sorted by descending
private PriorityQueue< Tuple> toDistanceSortedQueue(int k, DataBag bag) {
PriorityQueue< Tuple> q = new PriorityQueue< Tuple>(k,
new Comparator< Tuple>() {
public int compare(Tuple t1, Tuple t2) {
try {
Double dist1 = (Double)t1.get(1);
Double dist2 = (Double)t2.get(1);
return dist2.compareTo(dist1);
} catch (ExecException e) {
throw new RuntimeException("Error comparing tuples", e);
for (Tuple tuple : bag) q.add(tuple);
return q;

private DataBag fromQueue(PriorityQueue< Tuple> q) {
DataBag bag = bagFactory.newDefaultBag();
for (Tuple tuple : q) bag.add(tuple);
return bag;

private Double haversineDistance(Double x1, Double y1, Double x2, Double y2) {
double a = Math.pow(Math.sin((x2-x1)/2), 2)
+ Math.cos(x1) * Math.cos(x2) * Math.pow(Math.sin((y2-y1)/2), 2);

return (2 * Math.asin(Math.min(1, Math.sqrt(a))));

The details of the NearestNeighbors UDF aren't super important and it's mostly pretty clear what's going on. Just know that it operates on a bag of points as input and returns a bag of points as output that has the same schema. This is really important since we're going to be iterating.

Then we're on to the Pig part, hurray! Since Pig doesn't have any built in support for iteration, I chose to use Jruby (because it's awesome) and pig's "PigServer" java class to do all the work. Here's what the jruby runner looks like (it's kind of a lot so don't get scared):

#!/usr/bin/env jruby

require 'java'

# You might consider changing this to point to where you have
# pig installed...
jar = "/usr/lib/pig/pig-0.8.1-cdh3u1-core.jar"
conf = "/etc/hadoop/conf"

$CLASSPATH << conf
require jar

import org.apache.pig.ExecType
import org.apache.pig.PigServer
import org.apache.pig.FuncSpec

class NearestNeighbors

attr_accessor :points, :k, :min_zl, :runmode

# Create a new nearest neighbors instance
# for the given points, k neighbors to find,
# a optional minimum zl (1-21) and optional
# hadoop run mode (local or mapreduce)
def initialize points, k, min_zl=20, runmode='mapreduce'
@points = points
@k = k
@min_zl = min_zl
@runmode = runmode

# Run the nearest neighbors algorithm
def run

# Actually runs all the pig queries for
# the algorithm. Stops if all neighbors
# have been found or if min_zl is reached
def run_algorithm
start_nearest_neighbors(points, k, 22)
if run_nearest_neighbors(k, 22)
21.downto(min_zl) do |zl|
iterate_nearest_neighbors(k, zl)
break unless run_nearest_neighbors(k,zl)

# Registers algorithm initialization queries
def start_nearest_neighbors(input, k, zl)

# Registers algorithm iteration queries
def iterate_nearest_neighbors k, zl

# Runs one iteration of the algorithm
def run_nearest_neighbors(k, zl)
@pig.register_query(PigQueries.nearest_neighbors(k, zl))
@pig.register_query(PigQueries.split_results(k, zl))

if !@pig.exists_file("done#{zl}")"done#{zl}", "done#{zl}")
not_done ="not_done#{zl}", "not_done#{zl}")

# Start a new pig server with the specified run mode
def start_pig_server
@pig =

# Stop the running pig server
def stop_pig_server

# Register the jar that contains the nearest neighbors
# and quadkeys udfs and define functions for them.
def register_jars_and_functions

# A simple class to contain the pig queries
class PigQueries

# Load the geonames points. Obviously,
# this should be modified to accept a
# variable schema.
def self.load_points geonames
"points = LOAD '#{geonames}' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray

# Query to generate quadkeys at the specified zoom level
def self.generate_initial_quadkeys(zl)
"projected#{zl} = FOREACH points GENERATE GetQuadkey(longitude, latitude, #{zl}) AS quadkey, geonameid, longitude, latitude, {};"

# Load previous iteration's done points
def self.load_prior_done(zl)
"prior_done#{zl+1} = LOAD 'done#{zl+1}/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}

# Load previous iteration's not done points
def self.load_prior_not_done(zl)
"prior_not_done#{zl+1} = LOAD 'not_done#{zl+1}/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}

# Union the previous iterations points that are done
# with the points that are not done
def self.union_priors zl
"prior_neighbors#{zl+1} = UNION prior_done#{zl+1}, prior_not_done#{zl+1};"

# Chop off one character of precision from the existing
# quadkey to go one zl down.
def self.trim_quadkey zl
"projected#{zl} = FOREACH prior_neighbors#{zl+1} GENERATE
SUBSTRING(quadkey, 0, #{zl}) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;"

# Group the points by quadkey
def self.group_by_quadkey zl
"grouped#{zl} = FOREACH (GROUP projected#{zl} BY quadkey) GENERATE group AS quadkey, projected#{zl}.(geonameid, longitude, latitude, $4) AS points_bag;"

# Run the nearest neighbors udf on all the points for
# a given quadkey
def self.nearest_neighbors(k, zl)
"nearest_neighbors#{zl} = FOREACH grouped#{zl} GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(#{k}l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}

# Split the results into done and not_done relations
# The algorithm is done when 'not_done' contains
# no more tuples.
def self.split_results(k, zl)
"SPLIT nearest_neighbors#{zl} INTO done#{zl} IF COUNT(neighbors) >= #{k}l, not_done#{zl} IF COUNT(neighbors) < #{k}l;"

end[0], ARGV[1]).run

Call this file "nearest_neighbors.rb". The idea here is that we register some basic pig queries to do the initialization of the algorithm and the iterations. These queries are run over and over until either the "not_done" relation contains no more elements or the minimum zoom level has been reached. Note that a small zoom level means a big partition of space.

Run it!

I think we're finally ready to run it. Let K=5 and the min zoom level (zl) be 10. Then just run:

$: ./nearest_neighbors.rb allCountries.txt 5 10

To kick it off. The output will live in 'done10' (in your home directory on the hdfs) and all the ones that couldn't find their neighbors (poor guys) are left in 'not_done10'. Let's take a look:

$: hadoop fs -cat done10/part* | head
22321231 5874132 -164.73333 67.83333 {(5870713,0.0020072489956274512),(5864687,0.001833343068439346),(5879702,0.0017344751302650937),(5879702,0.0017344751302650937),(5866444,9.775849082653818E-4)}
133223322 2631726 -17.98263 66.52688 {(2631999,8.959503690090641E-4),(2629528,8.491922360779314E-4),(2629528,8.491922360779314E-4),(2630727,3.840018669838177E-4),(2630727,3.840018669838177E-4)}
133223322 2631999 -18.01884 66.56514 {(2631726,8.959503690090641E-4),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5)}
200103201 5874186 -165.39611 64.74333 {(5864454,0.001422335354026523),(5864454,0.001422335354026523),(5867287,0.0013175743301195593),(5867186,0.0010114669397588846),(5867287,0.0013175743301195593)}
200103201 5878614 -165.3 64.76667 {(5874186,0.0017231123142588567),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047)}
200103203 5875461 -165.55111 64.53889 {(5865814,0.0028283599772347947),(5874676,0.0025819291222640857),(5876108,0.001901914079309611),(5869354,0.0016504815389672197),(5869180,0.0025319553109125676)}
200103300 5861248 -164.27639 64.69278 {(5880635,9.627541402483858E-4),(5878642,8.535957131129946E-4),(5878642,8.535957131129946E-4),(5876626,6.598180173900259E-4),(5876626,6.598180173900259E-4)}
200103300 5876626 -164.27111 64.65389 {(5880635,8.246219806226404E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4)}
200233011 5870290 -170.3 57.21667 {(5867100,0.00113848360324883),(7117548,0.0011082333731440464),(7117548,0.0011082333731440464),(5865746,0.001071745830095263),(5865746,0.001071745830095263)}
200233123 5873595 -169.48056 56.57778 {(7275749,0.0010608526185899635),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4)}


Pig Code

Let's just take a look at the pig code that actually got executed.

points = LOAD 'allCountries.txt' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray
projected22 = FOREACH points GENERATE GetQuadkey(longitude, latitude, 22) AS quadkey, geonameid, longitude, latitude, {};
grouped22 = FOREACH (GROUP projected22 BY quadkey) GENERATE group AS quadkey, projected22.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors22 = FOREACH grouped22 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors22 INTO done22 IF COUNT(neighbors) >= 5l, not_done22 IF COUNT(neighbors) < 5l;
prior_done22 = LOAD 'done22/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done22 = LOAD 'not_done22/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors22 = UNION prior_done22, prior_not_done22;
projected21 = FOREACH prior_neighbors22 GENERATE
SUBSTRING(quadkey, 0, 21) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped21 = FOREACH (GROUP projected21 BY quadkey) GENERATE group AS quadkey, projected21.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors21 = FOREACH grouped21 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors21 INTO done21 IF COUNT(neighbors) >= 5l, not_done21 IF COUNT(neighbors) < 5l;
prior_done21 = LOAD 'done21/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done21 = LOAD 'not_done21/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors21 = UNION prior_done21, prior_not_done21;
projected20 = FOREACH prior_neighbors21 GENERATE
SUBSTRING(quadkey, 0, 20) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped20 = FOREACH (GROUP projected20 BY quadkey) GENERATE group AS quadkey, projected20.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors20 = FOREACH grouped20 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors20 INTO done20 IF COUNT(neighbors) >= 5l, not_done20 IF COUNT(neighbors) < 5l;
prior_done20 = LOAD 'done20/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done20 = LOAD 'not_done20/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors20 = UNION prior_done20, prior_not_done20;
projected19 = FOREACH prior_neighbors20 GENERATE
SUBSTRING(quadkey, 0, 19) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped19 = FOREACH (GROUP projected19 BY quadkey) GENERATE group AS quadkey, projected19.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors19 = FOREACH grouped19 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors19 INTO done19 IF COUNT(neighbors) >= 5l, not_done19 IF COUNT(neighbors) < 5l;
prior_done19 = LOAD 'done19/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done19 = LOAD 'not_done19/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors19 = UNION prior_done19, prior_not_done19;
projected18 = FOREACH prior_neighbors19 GENERATE
SUBSTRING(quadkey, 0, 18) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped18 = FOREACH (GROUP projected18 BY quadkey) GENERATE group AS quadkey, projected18.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors18 = FOREACH grouped18 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors18 INTO done18 IF COUNT(neighbors) >= 5l, not_done18 IF COUNT(neighbors) < 5l;
prior_done18 = LOAD 'done18/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done18 = LOAD 'not_done18/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors18 = UNION prior_done18, prior_not_done18;
projected17 = FOREACH prior_neighbors18 GENERATE
SUBSTRING(quadkey, 0, 17) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped17 = FOREACH (GROUP projected17 BY quadkey) GENERATE group AS quadkey, projected17.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors17 = FOREACH grouped17 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors17 INTO done17 IF COUNT(neighbors) >= 5l, not_done17 IF COUNT(neighbors) < 5l;
prior_done17 = LOAD 'done17/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done17 = LOAD 'not_done17/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors17 = UNION prior_done17, prior_not_done17;
projected16 = FOREACH prior_neighbors17 GENERATE
SUBSTRING(quadkey, 0, 16) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped16 = FOREACH (GROUP projected16 BY quadkey) GENERATE group AS quadkey, projected16.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors16 = FOREACH grouped16 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors16 INTO done16 IF COUNT(neighbors) >= 5l, not_done16 IF COUNT(neighbors) < 5l;
prior_done16 = LOAD 'done16/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done16 = LOAD 'not_done16/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors16 = UNION prior_done16, prior_not_done16;
projected15 = FOREACH prior_neighbors16 GENERATE
SUBSTRING(quadkey, 0, 15) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped15 = FOREACH (GROUP projected15 BY quadkey) GENERATE group AS quadkey, projected15.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors15 = FOREACH grouped15 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors15 INTO done15 IF COUNT(neighbors) >= 5l, not_done15 IF COUNT(neighbors) < 5l;
prior_done15 = LOAD 'done15/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done15 = LOAD 'not_done15/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors15 = UNION prior_done15, prior_not_done15;
projected14 = FOREACH prior_neighbors15 GENERATE
SUBSTRING(quadkey, 0, 14) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped14 = FOREACH (GROUP projected14 BY quadkey) GENERATE group AS quadkey, projected14.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors14 = FOREACH grouped14 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors14 INTO done14 IF COUNT(neighbors) >= 5l, not_done14 IF COUNT(neighbors) < 5l;
prior_done14 = LOAD 'done14/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done14 = LOAD 'not_done14/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors14 = UNION prior_done14, prior_not_done14;
projected13 = FOREACH prior_neighbors14 GENERATE
SUBSTRING(quadkey, 0, 13) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped13 = FOREACH (GROUP projected13 BY quadkey) GENERATE group AS quadkey, projected13.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors13 = FOREACH grouped13 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors13 INTO done13 IF COUNT(neighbors) >= 5l, not_done13 IF COUNT(neighbors) < 5l;
prior_done13 = LOAD 'done13/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done13 = LOAD 'not_done13/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors13 = UNION prior_done13, prior_not_done13;
projected12 = FOREACH prior_neighbors13 GENERATE
SUBSTRING(quadkey, 0, 12) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped12 = FOREACH (GROUP projected12 BY quadkey) GENERATE group AS quadkey, projected12.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors12 = FOREACH grouped12 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors12 INTO done12 IF COUNT(neighbors) >= 5l, not_done12 IF COUNT(neighbors) < 5l;
prior_done12 = LOAD 'done12/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done12 = LOAD 'not_done12/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors12 = UNION prior_done12, prior_not_done12;
projected11 = FOREACH prior_neighbors12 GENERATE
SUBSTRING(quadkey, 0, 11) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped11 = FOREACH (GROUP projected11 BY quadkey) GENERATE group AS quadkey, projected11.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors11 = FOREACH grouped11 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors11 INTO done11 IF COUNT(neighbors) >= 5l, not_done11 IF COUNT(neighbors) < 5l;
prior_done11 = LOAD 'done11/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_not_done11 = LOAD 'not_done11/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
prior_neighbors11 = UNION prior_done11, prior_not_done11;
projected10 = FOREACH prior_neighbors11 GENERATE
SUBSTRING(quadkey, 0, 10) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped10 = FOREACH (GROUP projected10 BY quadkey) GENERATE group AS quadkey, projected10.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors10 = FOREACH grouped10 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
SPLIT nearest_neighbors10 INTO done10 IF COUNT(neighbors) >= 5l, not_done10 IF COUNT(neighbors) < 5l;

So you can see now why, with iteration, it's a good idea to generate as much code as possible.

And we're done :)

All the code for this is on github in the Sounder repo (along with tons of other Apache Pig examples).

Structural Similarity With Apache Pig

A while ago I posted this about computing the jaccard similarity (otherwise known as the structural similarity) of nodes in a network graph. Looking back on it over the past few days I realize there are some areas for serious improvement. Actually, it's just completely wrong. The approach is broken. So, I'm going to revisit the algorithm again, in more detail, and write a vastly improved Pig script for computing the structural similarity of vertices in a network graph.

The graph

Of course, before you can do a whole lot of anything, you've got to have a graph to work with. To keep things dead simple (and breaking from what I normally do) I'm going to draw an arbitrary graph (and by draw I mean draw). Here it is:

This can be represented as a tab-separated-values set of adjacency pairs in a file called 'graph.tsv':

$: cat graph.tsv

I don't think it gets any simpler.

The Measure

Now, the idea here is to compute the similarity between nodes, eg similarity(A,B). There's already a well defined measure for this called the jaccard similarity. The key take away is to notice that:

The Pig

This can be broken into a set of Pig operations quite easily actually:

load data

Remember, I saved the data into a file called 'graph.tsv'. So:

edges = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);
edges_dup = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);

It is necessary, but still a hack, to load the data twice at the moment. This is because self-intersections (which I'll be doing in a moment) don't work with Pig 0.8 at the moment.

augment with sizes

Now I need the 'size' of each of the sets. In this case the 'sets' are the list of nodes each node links to. So, all I really have to do is calculate the number of outgoing links:

grouped_edges = GROUP edges BY v1;
aug_edges = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;

grouped_dups = GROUP edges_dup BY v1;
aug_dups = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;

Again, if self-intersections worked, the operation of counting the outgoing links would only have to happen once. Now I've got a handle on the |A| and |B| parts of the equation above.


Next I'm going to compute the intersection using a Pig join:

edges_joined = JOIN aug_edges BY v2, aug_dups BY v2;
intersection = FOREACH edges_joined {
-- results in:
-- (X, Y, |X| + |Y|)
added_size = aug_edges::v1_out + aug_dups::v1_out;
aug_edges::v1 AS v1,
aug_dups::v1 AS v2,
added_size AS added_size

Notice I'm adding the individual set sizes. This is to come up the |A| + |B| portion of the denominator in the jaccard index.

intersection sizes

In order to compute the size of the intersection I've got to use a Pig GROUP and collect all the elements that matched on the JOIN:

intersect_grp = GROUP intersection BY (v1, v2);
intersect_sizes = FOREACH intersect_grp {
-- results in:
-- (X, Y, |X /\ Y|, |X| + |Y|)
intersection_size = (double)COUNT(intersection);
FLATTEN(group) AS (v1, v2),
intersection_size AS intersection_size,
MAX(intersection.added_size) AS added_size -- hack, we only need this one time

There's a hack in there. The reason is that 'intersection.added_size' is a Pig data bag containing some number of identical tuples (equal to the intersection size). Each of these tuples is the 'added_size' from the previous step. Using MAX is just a convenient way of pulling out only one of them.


And finally, I have all the pieces in place to compute the similarities:

similarities = FOREACH intersect_sizes {
-- results in:
-- (X, Y, |X /\ Y|/|X U Y|)
similarity = (double)intersection_size/((double)added_size-(double)intersection_size);
v1 AS v1,
v2 AS v2,
similarity AS similarity

That'll do it. Here's the full script for completeness:

edges = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);
edges_dup = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);

-- Augment the edges with the sizes of their outgoing adjacency lists. Note that
-- if a self join was possible we would only have to do this once.
grouped_edges = GROUP edges BY v1;
aug_edges = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;

grouped_dups = GROUP edges_dup BY v1;
aug_dups = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;

-- Compute the sizes of the intersections of outgoing adjacency lists
edges_joined = JOIN aug_edges BY v2, aug_dups BY v2;
intersection = FOREACH edges_joined {
-- results in:
-- (X, Y, |X| + |Y|)
added_size = aug_edges::v1_out + aug_dups::v1_out;
aug_edges::v1 AS v1,
aug_dups::v1 AS v2,
added_size AS added_size

intersect_grp = GROUP intersection BY (v1, v2);
intersect_sizes = FOREACH intersect_grp {
-- results in:
-- (X, Y, |X /\ Y|, |X| + |Y|)
intersection_size = (double)COUNT(intersection);
FLATTEN(group) AS (v1, v2),
intersection_size AS intersection_size,
MAX(intersection.added_size) AS added_size -- hack, we only need this one time

similarities = FOREACH intersect_sizes {
-- results in:
-- (X, Y, |X /\ Y|/|X U Y|)
similarity = (double)intersection_size/((double)added_size-(double)intersection_size);
v1 AS v1,
v2 AS v2,
similarity AS similarity

DUMP similarities;


Run it:

$: pig -x local jaccard.pig

And it's done! Hurray. Now, go make a recommender system or something.

A Lucene Text Tokenization UDF for Apache Pig

As much as I loathe to admit it, sometimes java is called for. One of those times is tokenizing raw text. You'll notice in the post about tfidf how I used a Wukong script, written in ruby, to accomplish the task of tokenizing a large text corpus with Hadoop and Pig. There are a couple of problems with this:

1. Ruby is slow at this.

2. All the gem dependencies (wukong itself, extlib, etc) must exist on all the machines in the cluster and be available in the RUBYLIB (yet another environment variable to manage).

There is a better way.


Pig UDFs (User Defined Functions) come in a variety of flavors. The simplest type is the EvalFunc whose function 'exec()' essentially acts as the Wukong 'process()' method or the java hadoop Mapper's 'map()' function. Here we're going to write an EvalFunc that takes a raw text string as input and outputs a pig DataBag. Each Tuple in the DataBag will be a single token. Here's what it looks like as a whole:

import java.util.Iterator;

import org.apache.pig.EvalFunc;

import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.util.Version;
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.standard.StandardTokenizer;

public class TokenizeText extends EvalFunc {

private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();
private static String NOFIELD = "";
private static StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_31);

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 1 || input.isNull(0))
return null;

// Output bag
DataBag bagOfTokens = bagFactory.newDefaultBag();

StringReader textInput = new StringReader(input.get(0).toString());
TokenStream stream = analyzer.tokenStream(NOFIELD, textInput);
CharTermAttribute termAttribute = stream.getAttribute(CharTermAttribute.class);

while (stream.incrementToken()) {
Tuple termText = tupleFactory.newTuple(termAttribute.toString());
return bagOfTokens;

There's absolutely nothing special going on here. Remember, the 'exec' function gets called on every Pig Tuple of input. bagOfTokens will be the Pig DataBag returned. First, the lucene library tokenizes the input string. Then all the tokens in the resulting stream are turned into Pig Tuples and added to the result DataBag. Finally the resulting DataBag is returned. A document is truly a bag of words.

Example Pig Script

And here's an example script to use that UDF:

documents = LOAD 'documents' AS (doc_id:chararray, text:chararray);
tokenized = FOREACH documents GENERATE doc_id AS doc_id, FLATTEN(TokenizeText(text)) AS (token:chararray);

And that's it. It's blazing fast text tokenization for Apache Pig.


TF-IDF With Apache Pig

Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).

Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.

One of the cool things about tf-idf is how damn simple it is. Take a look at the wikipedia page. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (>= 0.8) installed, and the Wukong rubygem.

Get Data

What would an example be without some data? Here's a link to the canonical 20 newsgroups data set that I've restructured slightly for your analysis pleasure.


Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find here. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.

Pig, Step By Step

The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:

DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');

This statement can be interpreted to mean:

"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."

Next up is to load and tokenize the data:

raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);

So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs.

Then, we need to count the number of times each unique (doc_id, token) pair appears:

doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;

And, since we're after the token frequencies and not just the counts, we need to attach the document sizes:

doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size

Then the term frequencies are just:

term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;

For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term:

term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token

Finally, we can compute the tf-idf weight:

tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf

Where NDOCS is the total number of documents in the corpus.

Pig, All Together Now

And here's the final script, all put together:

DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');

raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);

doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;

doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size

term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;

term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token

tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf

STORE tfidf_all INTO '$OUT';

Run It

Assuming your data is on the hdfs at "/data/corpus/20_newsgroups/data", and you've named the pig script "tfidf.pig", then you can run with the following:

pig -p DOCS=/data/corpus/20_newsgroups/data -p NDOCS=18828 -p OUT=/data/corpus/20_newsgroups/tfidf tfidf.pig

Here's what the output of that looks like:

sci.crypt.15405 student 0.0187808247467920464
sci.crypt.16005 student 0.0541184292045718135 student 0.0839387881540297476 student 0.0027903667703849783 student 0.0377339506380500803
sci.crypt.15178 student 0.0010815147566519744
soc.religion.christian.21414 student 0.0587571517078208302 student 0.0685500103257909721
soc.religion.christian.21485 student 0.0232372916358613464
soc.religion.christian.21556 student 0.0790961657605280533


Next time let's look at how to take that output and recover the topics using clustering!

Indexing Text Corpora With Pig and ElasticSearch

Routinely working with raw data is strenuous, ulcer inducing, and overall hazardous to your health. Unstructured text even more so. Safe data mining requires proper mining gear. Imagine if you could step into a power-suit, fire up your hydraulic force pincers, and make that data your bitch?
Turns out, a specialty of mine is in building useful and interesting exoskeletons (think of Sigourney Weaver in Aliens) for developers of all shapes and sizes.

On that note I've written a pig STORE function for elasticsearch. Now you can use simple Pig syntax to transform arbitrary input data and index the output records with elasticsearch. Here's an example:

%default INDEX 'ufo_sightings'
%default OBJ 'ufo_sighting'

ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);
STORE ufo_sightings INTO 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchIndex('-1', '1000');

Where '-1' means the records have no inherent id and '1000' is the number of records to batch up before indexing. Here's the link to the github page (wonderdog).

It doesn't get any simpler. You've just been endowed with magic super text indexing powers. Now go. Index some raw text.


Brute Force Graph Crunching With Pig and Wukong

Just discovered this amazing data set matching all Marvel Universe comic book characters with the comic books they've appeared in (Social Characteristics of the Marvel Universe). I've made the data set available on Infochimps here in a sane and easy to use format.

Here's what that looks like:

$: head labeled_edges.tsv | wu-lign
"M'SHULLA" "AA2 35"
"OLD SKULL" "AA2 35"
"G'RATH" "AA2 35"

Simple Question

A natural question to ask of such an awesome graph is for the similarity between two characters based on what comic books they've appeared in together. This is called the structural similarity since we're only using the structure of the graph and no other meta data (weights, etc). Note that this could also be applied the other direction to find the similarity between two comic books based on what characters they share.

Wee bit of math

The structural similarity is nothing more than the jaccard similarity applied to nodes in a network graph. Here's the definition of that from wikipedia:

So basically all we've got to do is get a list of all the comic books that two characters, say character A and character B, have appeared in. These lists of comic books form two mathematical sets.

The numerator in that simple formula says to compute the intersection of A and B and then count how many elements are left. More plainly, that's just the number of comic books the two characters have in common.

The denominator tells us to compute the union of A and B and count how many elements are in the resulting set. That's just the number of unique comic books that A and B have ever been in, either at the same time or not.


Here how we're going to say it using pig:

DEFINE jaccard_similarity `ruby jaccard_similarity.rb --map` SHIP('jaccard_similarity.rb');
edges = LOAD '/data/comics/marvel/labeled_edges.tsv' AS (character:chararray, comic:chararray);
grouped = GROUP edges BY character;
with_sets = FOREACH grouped GENERATE group AS character, FLATTEN(edges.comic) AS comic, edges.comic AS set;
SPLIT with_sets INTO with_sets_dup IF ( 1 > 0 ), not_used if (1 < 0); -- hack hack hack, self join still doesn't work
joined = JOIN with_sets BY comic, with_sets_dup BY comic;
pairs = FOREACH joined GENERATE
with_sets::character AS character_a,
with_sets::set AS character_a_set,
with_sets_dup::character AS character_b,
with_sets_dup::set AS character_b_set
similarity = STREAM pairs THROUGH jaccard_similarity AS (character_a:chararray, character_b:chararray, similarity:float);
STORE similarity INTO '/data/comics/marvel/character_similarity.tsv';

Notice we're doing a bit of funny business here. Writing the actual algorithm for the jaccard similarity between two small sets doesn't make much sense in Pig. Instead we've written a wukong script to do it for us (you could also write a Pig udf if you're a masochist).

The first thing we do here is use the DEFINE operator to tell pig that there's an external command we want to call, the alias for it, how to call it, and to SHIP the script we need to all nodes in the cluster.

Next we use the GROUP operator and then the FOREACH..GENERATE projection operator to get, for every character, a the list of comic books they've appeared in.

We also use the FLATTEN operator during the projection as well. The reason is so that we can use the JOIN operator to pull out (character,character) pairs that have at least one comic book in common. (Don't get scared about the gross looking SPLIT operator in there. Just ignore it. It's a hack to get around the fact that self-joins still don't quite work properly in pig. Pretend we're just joining 'with_sets' with itself.)

The last step is to STREAM our pairs through the simple wukong script. Here's what that looks like:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'
require 'wukong/and_pig' # for special conversion methods
require 'set'

# Takes two pig bags and computes their jaccard similarity
# eg.
# input:
# (a,{(1),(2),(3)}, b, {(2),(9),(5)})
# output:
# (a, b, 0.2)
class JaccardSim < Wukong::Streamer::RecordStreamer
def process node_a, set_a, node_b, set_b
yield [node_a, node_b, jaccard(set_a, set_b)]

def jaccard bag_a, bag_b
common_elements = ((bag_a.from_pig_bag.to_set).intersection(bag_b.from_pig_bag.to_set)).size
total_elements = ((bag_a.from_pig_bag.to_set).union(bag_b.from_pig_bag.to_set)).size
common_elements.to_f / total_elements.to_f
end, nil).run

Notice the nifty 'from_pig_bag' method that wukong has. All we're doing here is converting the two pig bags into ruby 'set' objects then doing the simple jaccard similarity calculation. (3 lines of code for the calculation itself, still want to do it in java?)

And that's it. Here's what it looks like after running:

"HUMAN TORCH/JOHNNY S" "ROGUE /" 0.0371308030
"HUMAN TORCH/JOHNNY S" "UATU" 0.0481557360


Graph Processing With Apache Pig

So, you're probably sick of seeing this airport data set by now (flight edges) but it's so awesome that I have to re-use it. Let's use Pig to do the same calculation as this post in a much more succinct way. We'll really get a feel for what Pig is better at than Wukong.

Degree Distribution

Since the point here is to illustrate the difference between the wukong way and the pig way we're not going to introduce anything clever here. Here's the code for calculating the degree by month (both passengers and flights) for every domestic airport since 1990:

-- Caculates the monthly degree distributions for domestic airports from 1990 to 2009.
-- Load data (boring part)
flight_edges = LOAD '$FLIGHT_EDGES' AS (origin_code:chararray, destin_code:chararray, passengers:int, flights:int, month:int);

-- For every (airport,month) pair get passengers, seats, and flights out
edges_out = FOREACH flight_edges GENERATE
origin_code AS airport,
month AS month,
passengers AS passengers_out,
flights AS flights_out

-- For every (airport,month) pair get passengers, seats, and flights in
edges_in = FOREACH flight_edges GENERATE
destin_code AS airport,
month AS month,
passengers AS passengers_in,
flights AS flights_in

-- group them together and sum
grouped_edges = COGROUP edges_in BY (airport,month), edges_out BY (airport,month);
degree_dist = FOREACH grouped_edges {
passenger_degree = SUM(edges_in.passengers_in) + SUM(edges_out.passengers_out);
flights_degree = SUM(edges_in.flights_in) + SUM(edges_out.flights_out);
FLATTEN(group) AS (airport, month),
passenger_degree AS passenger_degree,
flights_degree AS flights_degree

STORE degree_dist INTO '$DEG_DIST';

So, here's what's going on:

  • FOREACH..GENERATE: this is called a 'projection' in pig. Here we're really just cutting out the fields we don't want and rearranging our records. This is exactly the same as what we do in the wukong script, where we yielded two different types of records for the same input data in the map phase, only a lot more clear.

  • COGROUP: here we're simply joining our two data relations together (edges in and edges out) by a common key (airport code,month) and aggregating the values for that key. This is exactly the same as what we do in the 'accumulate' part of the wukong script.

  • FOREACH..GENERATE (once more): here we run through our grouped records and sum the flights and passengers. This is exactly the same as the 'finalize' part of the wukong script.

So, basically, we've done in 4 lines (not counting the LOAD and STORE or the prettification) of very clear and concise code what took us ~70 lines of ruby. Win.

Here's the wukong one again for reference:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class EdgeMapper < Wukong::Streamer::RecordStreamer
# Yield both ways so we can sum (passengers in + passengers out) and (flights
# in + flights out) individually in the reduce phase.
def process origin_code, destin_code, passengers, flights, month
yield [origin_code, month, "OUT", passengers, flights]
yield [destin_code, month, "IN", passengers, flights]

class DegreeCalculator < Wukong::Streamer::AccumulatingReducer
# What are we going to use as a key internally?
def get_key airport, month, in_or_out, passengers, flights
[airport, month]

def start! airport, month, in_or_out, passengers, flights
@out_degree = {:passengers => 0, :flights => 0}
@in_degree = {:passengers => 0, :flights => 0}

def accumulate airport, month, in_or_out, passengers, flights
case in_or_out
when "IN" then
@in_degree[:passengers] += passengers.to_i
@in_degree[:flights] += flights.to_i
when "OUT" then
@out_degree[:passengers] += passengers.to_i
@out_degree[:flights] += flights.to_i

# For every airport and month, calculate passenger and flight degrees
def finalize

# Passenger degrees (out, in, and total)
passengers_out = @out_degree[:passengers]
passengers_in = @in_degree[:passengers]
passengers_total = passengers_in + passengers_out

# Flight degrees (out, in, and total)
flights_out = @out_degree[:flights]
flights_in = @in_degree[:flights]
flights_total = flights_in + flights_out

yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]

# Need to use 2 fields for partition so every record with the same airport and
# month land on the same reducer
:partition_fields => 2 # use two fields to partition records

Plot Data

A very common workflow pattern with Hadoop is to use a tool like pig or wukong to process large scale data and generate some result data set. The last step in this process is (rather obviously) to summarize that data in some way. Here's a quick plot (using R and ggplot2) of the flights degree distribution after I further summarized by year:

That's funny...

Finally, here's the code for the plot:

# include the ggplot2 library for nice plots

# Read data in and take a subset
degrees <- read.table('yearly_degrees.tsv', header=FALSE, sep='\t', colClasses=c('character', 'character', 'numeric', 'numeric'));
names(degrees) <- c('airport_code', 'year', 'passenger_degree', 'flights_degree');
select_degrees <- subset(degrees, year=='2000' | year=='2001' | year=='2002' | year=='2009' | year=='1990');

# Plotting with ggplot2
pdf('passenger_degrees.pdf', 12, 6, pointsize=10);
ggplot(select_degrees, aes(x=passenger_degree, fill=year)) + geom_density(colour='black', alpha=0.3) + scale_x_log10() + ylab('Probability') + xlab(expression(log[10] ('Passengers in + Passengers out'))) + opts(title='Passenger Degree Distribution')


Bulk Indexing With ElasticSearch and Hadoop

At Infochimps we recently indexed over 2.5 billion documents for a total of 4TB total indexed size. This would not have been possible without ElasticSearch and the Hadoop bulk loader we wrote, wonderdog. I'll go into the technical details in a later post but for now here's how you can get started with ElasticSearch and Hadoop:

Getting Started with ElasticSearch

The first thing is to actually install elasticsearch:

$: wget
$: sudo mv elasticsearch-0.14.2 /usr/local/share/
$: sudo ln -s /usr/local/share/elasticsearch-0.14.2 /usr/local/share/elasticsearch

Next you'll want to make sure there is an 'elasticsearch' user and that there are suitable data, work, and log directories that 'elasticsearch' owns:

$: sudo useradd elasticsearch
$: sudo mkdir -p /var/log/elasticsearch /var/run/elasticsearch/{data,work}
$: sudo chown -R elasticsearch /var/{log,run}/elasticsearch

Then get wonderdog (you'll have to git clone it for now) and go ahead and copy the example configuration in wonderdog/config:

$: sudo mkdir -p /etc/elasticsearch
$: sudo cp config/elasticsearch-example.yml /etc/elasticsearch/elasticsearch.yml
$: sudo cp config/logging.yml /etc/elasticsearch/
$: sudo cp config/ /etc/elasticsearch/

Make changes to 'elasticsearch.yml' such that it points to the correct data, work, and log directories. Also, you'll want to change the number of 'recovery_after_nodes' and 'expected_nodes' in elasticsearch.yml to however many nodes (machines) you actually expect to have in your cluster. You'll probably also want to do a quick once-over of and make sure the jvm settings, etc are sane for your particular setup. Finally, to startup do:

sudo -u elasticsearch /usr/local/share/elasticsearch/bin/elasticsearch -Des.config=/etc/elasticsearch/elasticsearch.yml

You should now have a happily running (reasonably configured) elasticsearch data node.

Index Some Data


  • You have a working hadoop cluster

  • Elasticsearch data nodes are installed and running on all your machines and they have discovered each other. See the elasticsearch documentation for details on making that actually work.

  • You've installed the following rubygems: 'configliere' and 'json'

Get Data

As an example lets index this UFO sightings data set from Infochimps here. (You should be familiar with this one by now...) It's mostly raw text and so it's a very reasonable thing to index. Once it's downloaded go ahead and throw it on the HDFS:

$: hadoop fs -mkdir /data/domestic/ufo
$: hadoop fs -put chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv /data/domestic/ufo/

Index Data

This is the easy part:

$: bin/wonderdog --rm --field_names=sighted_at,reported_at,location,shape,duration,description --id_field=-1 --index_name=ufo_sightings --object_type=ufo_sighting --es_config=/etc/elasticsearch/elasticsearch.yml /data/domestic/aliens/ufo_awesome.tsv /tmp/elasticsearch/aliens/out


'--rm' - Remove output on the hdfs if it exists
'--field_names' - A comma separated list of the field names in the tsv, in order
'--id_field' - The field to use as the record id, -1 if the record has no inherent id
'--index_name' - The index name to bulk load into
'--object_type' - The type of objects we're indexing
'--es_config' - Points to the elasticsearch config*

*The elasticsearch config that the hadoop machines need must be on all the hadoop machines and have a 'hosts' entry listing the ips of all the elasticsearch data nodes (see wonderdog/config/elasticsearch-example.yml). This means we can run the hadoop job on a different cluster than the elasticsearch data nodes are running on.

The other two arguments are the input and output paths. The output path in this case only gets written to if one or more index requests fail. This way you can re-run the job on only those records that didn't make it the first time.

The indexing should go pretty quickly.
Next is to refresh the index so we can actually query our newly indexed data. There's a tool in wonderdog's bin directory for that:

$: bin/estool --host=`hostname -i` refresh_index

Query Data

Once again, use estool

$: bin/estool --host=`hostname -i` --index_name=ufo_sightings --query_string="ufo" query


JRuby and Hadoop, Notes From a Non-Java Programmer

So I spent a fair deal of time playing with JRuby this weekend. Here's my notes/conclusions so far. Disclaimer: My experience with java overall is limited, so most of this might be obvious to a java ninja but maybe not to a ruby one...


The whole point here is that it sure would be nice to only write a few lines of ruby code into a file called something like 'rb_mapreduce.rb' and run it by saying: ./rb_mapreduce.rb. No compiling, no awkward syntax. Pure ruby, plain and simple.


I created a 'WukongPlusMapper' that subclassed 'org.apache.hadoop.mapreduce.Mapper' and implemented the important methods, namely 'map'. Then I setup and launched a job from inside jruby using this jruby mapper ('WukongPlusMapper') as the map class.

The job launched and ran just fine. But...

Problems and Lessons Learned:

  • It is possible (in fact extremely easy) to setup and launch a Hadoop job with pure jruby

  • It is not possible, that I can tell so far, to use an uncompiled jruby class as either the mapper or the reducer for a Hadoop job. It doesn't throw an error (so long as you've subclassed a proper java mapper) but actually just uses the superclass's definition instead. I believe the reason is that each map task must have access to the full class definition for its mapper (only sensible) and has no idea what to do with my transient 'WukongPlusMapper' class. Obviously the same would apply to the reducer

  • It is possible to compile a jruby class ahead-of-time, stuff it into the job jar, and then launch the job with ordinary means. There are a couple somewhat obvious drawbacks with this method:

    • You've got to specify 'java_signatures' for each of your methods that are going to be called inside java

    • Messy logic+code for compiling thyself, stuffing thyself into a jar, shipping thyself with MR job. Might as well just write java at this point. radoop has some logic for doing that pretty well laid out.

  • It is possible to define and create an object in jruby that subclasses a java class or implements a java interface. Then you can simply overwrite the methods you want to overwrite. It's possible to pass instances of this class to a java runtime that only knows about the superclass and the subclass's methods (at least the ones that have the signatures defined in the superclass) will work just fine. Unfortunately, (and plainly obvious in hindsight) this does NOT work with Hadoop since these instances all show up in java as 'proxy classes' and are only accessible to the launching jvm

  • On another note there is the option of using the scripting engine which, as far as I can tell, is what both jruby-on-hadoop and radoop are using. Something of concern though is that neither of these two projects seem to have much traction. However, it may be that the scripting engine is the only way to reasonably make this work, at least 2 people vote yes ...

So, implementation complexity aside, it looks like all one would have to do is come up with some way of making JRuby's in-memory class definitions available to all of the spawned mappers and reducers. Probably not something I want to delve into at the moment.

Script engine it is.

Pig, Bringing Simplicity to Hadoop

In strong contrast to the seat-of-your-pants style of Wukong there is another high level language for Hadoop called Pig. See Apache Pig.


At the top level, here's what Pig gets you:

  • No java required. That is, use as little (zero) or as much (reams) of java code as you want.

  • No boilerplate code

  • Intuitive and easy to understand language (similar to SQL) with clean uniform syntax

  • Separation of high level algorithm and low level map-reduce jobs

  • Build your analysis as a set of operations acting on data

  • Most algorithms are less than 5, human readable, lines of Pig

Get Pig

Go here and download pig (version 0.8) from somewhere close to you. Unpack it and put it wherever you like. Then, type 'pig' at the command line and see what happens. It's very likely that it doesn't pick up your existing Hadoop configuration. To fix that set HADOOP_HOME to point to your hadoop installation and PIG_CLASSPATH to point to your hadoop configuration (probably /etc/hadoop/conf). Here's all that again:

$: wget
$: tar -zxf pig-0.8.0.tar.gz
$: sudo mv pig-0.8.0 /usr/local/share/
$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig
$: sudo ln -s /usr/local/share/pig/bin/pig /usr/local/bin/pig
$: hash -r
$: export HADOOP_HOME=/usr/lib/hadoop
$: export PIG_CLASSPATH=/etc/hadoop/conf
$: pig
2011-01-21 09:56:32,486 [main] INFO org.apache.pig.Main - Logging error messages to: /home/jacob/pig_1295625392480.log
2011-01-21 09:56:32,879 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master:8020
2011-01-21 09:56:33,402 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: master:54311

See how easy that was. Get it over with now.


People post the most interesting data on Infochimps. Let's get the first billion digits of pi here. Notice that it's arranged in groups of 10 digits with 10 groups per line. We're going to use pig to see how the digits are distributed.

This will allow us to visually (once we make a plot) spot any obvious deviation from a random series of numbers. That is, for a random series of numbers we'd expect each digit (0-9) to appear equally often. If this isn't true then we know we're dealing with a more complicated beast.


After catting the data file you'll notice the end of the line has the crufty details attached that makes this data more or less impossible to load into Pig as a table. Pig is terrible at data munging/parsing/cleaning. Thankfully we have wukong. Let's write a dead simple wukong script to fix the last field and create one digit per line:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class PiCleaner < Wukong::Streamer::LineStreamer
def process line
fields = line.strip.split(' ', 10)
hundred_digit_string = [fields[0..8], fields[9][0..9]].join rescue ""
hundred_digit_string.each_char{|digit| yield digit}
end, nil).run

Let's go ahead and run that right away (this will take a few minutes depending on your rig, it'll be a billion lines of output...):

$: hdp-mkdir /data/math/pi/
$: hdp-put pi-010.txt /data/math/pi/
$: ./pi_clean.rb --run /data/math/pi/pi-010.txt /data/math/pi/first_billion_digits.tsv
I, [2011-01-22T11:01:57.363704 #12489] INFO -- : Launching hadoop!
I, [2011-01-22T11:01:57.363964 #12489] INFO -- : Running

/usr/local/share/hadoop/bin/hadoop \
jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.reduce.tasks=0 \
-D'pi_clean.rb---/data/math/pi/pi-010.txt---/data/math/pi/first_billion_digits.tsv' \
-mapper '/usr/bin/ruby1.8 pi_clean.rb --map ' \
-reducer '' \
-input '/data/math/pi/pi-010.txt' \
-output '/data/math/pi/first_billion_digits.tsv' \
-file '/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb' \
-cmdenv 'RUBYLIB=~/.rubylib'

11/01/22 11:01:59 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/22 11:01:59 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]
11/01/22 11:01:59 INFO streaming.StreamJob: Running job: job_201012031305_0251
11/01/22 11:01:59 INFO streaming.StreamJob: To kill this job, run:
11/01/22 11:01:59 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_0251
11/01/22 11:01:59 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_0251
11/01/22 11:02:00 INFO streaming.StreamJob: map 0% reduce 0%
11/01/22 11:05:11 INFO streaming.StreamJob: map 100% reduce 100%
11/01/22 11:05:11 INFO streaming.StreamJob: Job complete: job_201012031305_0251
11/01/22 11:05:11 INFO streaming.StreamJob: Output: /data/math/pi/first_billion_digits.tsv
packageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar4401930660028806042/] [] /tmp/streamjob3153669001520547.jar tmpDir=null

Great. Now let's write some Pig:


This is what Pig is awesome at. Remember that accumulating reducer in wukong? We're about to do the identical thing in two lines of no-nonsense pig:

-- load
digits = LOAD '$PI_DIGITS' AS (digit:int);

groups = GROUP digits BY digit;
counts = FOREACH groups GENERATE group AS digit, COUNT(digits) AS num_digits;

-- store
STORE counts INTO '$OUT';

All we're doing here is reading in the data (one digit per line), accumulating all the digits with the same 'digit', and counting them up. Save it into a file called 'pi.pig' and run with the following:

$: pig -p PI_DIGITS=/data/math/pi/first_billion_digits.tsv -p OUT=/data/math/pi/digit_counts.tsv pi.pig

I'll skip the output since it's rather verbose. Now we can make a simple plot by hdp-catting our output data into a local file (it's super tiny by now) and plotting it with R:

$: hdp-catd /data/math/pi/digit_counts.tsv > digit_counts.tsv
$: R
> library(ggplot2)
> digit_counts <- read.table('digit_counts.tsv', header=FALSE, sep="\t")
> names(digit_counts) <- c('digit', 'count')
> p <- ggplot(digit_counts, aes(x=digit)) + geom_histogram(aes(y = ..density.., weight = count, binwidth=1), colour='black', fill='grey', alpha=0.7)
> p + scale_y_continuous(limits=c(0,0.5))

Will result in the following:

I'll leave it to you to make your own assumptions.

Graph Processing With Wukong and Hadoop

As a last (for now) tutorial oriented post on Wukong, let's process a network graph.

Get Data

This airport data (airport edges) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.

Explore Data

We've got to actually look at the data before we can make any decisions about how to process it and what questions we'd like answered:

$: head data/flights_with_colnames.tsv | wu-lign
origin_airport destin_airport passengers flights month
MHK AMW 21 1 200810
EUG RDM 41 22 199011
EUG RDM 88 19 199012
EUG RDM 11 4 199010
MFR RDM 0 1 199002
MFR RDM 11 1 199003
MFR RDM 2 4 199001
MFR RDM 7 1 199009
MFR RDM 7 2 199011

So it's exactly what you'd expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics...

Ask A Question

A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph:

1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the 'in degree' and passengers out is (naturally) called the 'out degree'.

2. Flights Degree: For a given airport the number of flights in + the number of flights out.

Let's write the question wukong style:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class EdgeMapper < Wukong::Streamer::RecordStreamer
# Yield both ways so we can sum (passengers in + passengers out) and (flights
# in + flights out) individually in the reduce phase.
def process origin_code, destin_code, passengers, flights, month
yield [origin_code, month, "OUT", passengers, flights]
yield [destin_code, month, "IN", passengers, flights]

class DegreeCalculator < Wukong::Streamer::AccumulatingReducer
# What are we going to use as a key internally?
def get_key airport, month, in_or_out, passengers, flights
[airport, month]

def start! airport, month, in_or_out, passengers, flights
@out_degree = {:passengers => 0, :flights => 0}
@in_degree = {:passengers => 0, :flights => 0}

def accumulate airport, month, in_or_out, passengers, flights
case in_or_out
when "IN" then
@in_degree[:passengers] += passengers.to_i
@in_degree[:flights] += flights.to_i
when "OUT" then
@out_degree[:passengers] += passengers.to_i
@out_degree[:flights] += flights.to_i

# For every airport and month, calculate passenger and flight degrees
def finalize

# Passenger degrees (out, in, and total)
passengers_out = @out_degree[:passengers]
passengers_in = @in_degree[:passengers]
passengers_total = passengers_in + passengers_out

# Flight degrees (out, in, and total)
flights_out = @out_degree[:flights]
flights_in = @in_degree[:flights]
flights_total = flights_in + flights_out

yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]

# Need to use 2 fields for partition so every record with the same airport and
# month land on the same reducer
:partition_fields => 2 # use two fields to partition records

Don't panic. There's a lot going on in this script so here's the breakdown (real gentle like):


Here we're using wukong's RecordStreamer class which reads lines from $stdin and splits on tabs for us already. That's how we know exactly what arguments the process method gets.

Next, as is often the case with low level map-reduce, we've got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information ("OUT" or "IN") depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.

Finally, we've carefully rearranged our records such that (airport,month) is always the first two fields. We'll partition on this as the key. (We have to say that explicitly at the bottom of the script)


We've seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we've explicitly told it to use the airport and month as the key with the 'get_key' method.

* start! - Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let's all take a moment and think about how awful and unreadable that would be in java...)

* accumulate - Here we simply look at each record and decide which counters to increment depending on whether it's "OUT" or "IN".

* finalize - All we're doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the 'key' is just (airport,month).

Get An Answer

We know how to put the data on the hdfs and run the script by now so we'll skip that part. Here's what the output looks like:

$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign
1B1 200906 1 1 2 1 1 2
ABE 200705 0 83 83 0 3 3
ABE 199206 0 31 31 0 1 1
ABE 200708 0 904 904 0 20 20
ABE 200307 0 91 91 0 2 2
ABE 200703 0 36 36 0 1 1
ABE 199902 0 84 84 0 1 1
ABE 200611 0 753 753 0 18 18
ABE 199209 0 99 99 0 1 1
ABE 200702 0 54 54 0 1 1
ABE 200407 0 98 98 0 1 1
ABE 200705 0 647 647 0 15 15
ABE 200306 0 27 27 0 1 1
ABE 200703 0 473 473 0 11 11
ABE 200309 0 150 150 0 1 1
ABE 200702 0 313 313 0 8 8
ABE 200103 0 0 0 0 1 1
ABE 199807 0 105 105 0 1 1
ABE 199907 0 91 91 0 1 1
ABE 199501 0 50 50 0 1 1

At this point is where you might bring this back down to your local file system, crack open a program like R, make some plots, etc.

And we're done for now. Hurray.