mathjax

Thursday, November 14, 2013

Linear Regression With Pig

It's a bit silly trying to motivate a discussion of linear regression. It's everywhere. Linear regression is typically the first pass step for understanding a dataset. Is there a linear relationship between my variables? Maybe. Let's try linear regression! In other, and less precise, words we just fit a damn line to it. Suffice it to say linear regression is one of those tools for data analysis that's not optional. You must know it.

In fact, I'm going to assume just that. That you've used linear regression in other contexts before and understand its utility. The issue at hand is how to parallelize linear regression. Why? Well, suppose you have billions of feature vectors in your data set, each with thousands of features (columns), and you want to use all of them because, why not? Suppose it doesn't fit on one machine. Now, there exists a project to address this specifically, vowpal wabbit, which you should most certainly check out, but that I'm not going to talk about. Instead, the idea is to use Apache Pig. The reason for implementing it with Pig, rather than using an existing tool, is mostly for illustration. Linear regression with pig brings up several design and implementation details that I believe you'll face when doing almost any reasonably useful machine learning at scale. In other words, how do I wire all this shit together?

Specifically, I'll address pig macros, python drivers, and using a whole relation as a scalar. Fun stuff.

linear regression

It's important that I do at least explain a bit of terminology so we're all together in this. So, rather than jump for the most general explanation immediately (why do people do that?) let's talk about something real. Suppose you've measured the current running through a circuit while slowly decreasing the resistance. You would expect the current to increase linearly as you decrease the current (ohms law). In other words, 
\begin{eqnarray*}I=\frac{V}{R}\end{eqnarray*}

To verify Ohm's Law (not a bad idea, I mean, maybe you're living in a dream world where physics is different and you want to know for certain...) you'd record the current \(I\) and the resistance \(R\) at every measurement while holding the voltage \(V\) constant. You'd then fit a line to the data or, more specifically, to \(\frac{1}{R}\), and, if all went well, find that the slope of said line was equal to the voltage.

In the machine learning nomenclature the current would be called the response or target variable. All the responses together form a vector \(y\) called the response vector. The resistance would be called the feature or observation. And, if you recorded more than just the resistance, say, the temperature, then for every response you'd have a collection of features or a feature vector. All the feature vectors together form a matrix \(X\). The goal of linear regression is to find the best set of weights \(w\) that, when used to form a linear combination of the features, creates a vector that is as close as possible to the response vector. So the problem can be phrased as an optimization problem. Here the function we'll minimize is the mean squared error (the square of the distance between the weighted features and the response). Mathematically the squared error, for one feature vector \(x_{i}\) of length \(M\), can be written as:
\begin{eqnarray*}error^2=(y_{i}-\sum_{j=1}^Mw_{j}x_{i,j})^2\end{eqnarray*}
where \(x_{i,0}=1\) by definition.
So the mean squared error (mse), when we've got \(N\) measurements (feature vectors), is then:
\begin{eqnarray*}mse(w)=\frac{1}{N}\sum_{i=1}^N(y_{i}-\sum_{j=1}^Mw_{j}x_{i,j})^2\end{eqnarray*}
So now the question is, exactly how are we going to minimize the mse by varying the weights? Well, it turns out there's the method called gradient descent. That is, the mse decreases fastest if we start with a given set of weights and travel in the direction of the negative gradient of the mse for those weights. In other words:
\begin{eqnarray*}w_{new}=w-\alpha\nabla{mse(w)}\end{eqnarray*}
Where \(\alpha\) is the magnitude of the step size. What this gives us is a way to update the weights until \(w_{new}\) doesn't really change much. Once the weights converge we're done.

algorithm

Alright, now that we've got a rule for updating weights, we can write down the algorithm.

1. Initialize the weights, one per feature, randomly
2. Update the weights by subtracting the gradient of the mse
3. Repeat 2 until converged

implementation

Ok great. Let's get the pieces together.

pig

Pig is going to do most of the real work. There's two main steps involved. The first step, and one that varies strongly from domain to domain, problem to problem, is loading your data and transforming it into something that a generic algorithm can handle. The second, and more interesting, is the implementation of gradient descent of the mse itself.

Here's the implementation of the gradient descent portion. I'll go over each relation in detail.

Go ahead and save that in a directory called 'macros'. Since gradient descent of the mean squared error for the purposes of creating a linear model is a generic problem, it makes sense to implement it as a pig macro.

In lines 19-25 we're attaching the weights to every feature vector. The Zip UDF, which can be found on github, receives the weights as a tuple and the feature vector as a tuple. The output is a bag of new tuples which contains (weight,feature,dimension). Think of Zip like a zipper where it matches the weights to their corresponding features. Importantly, the dimension (index in the input tuples) is returned as well.

Something to notice about this first bit is the scalar cast. Zip receives the entire weights relation as a single object. Since the weights relation is only a single tuple anyway, this is great. This is a good thing. It prevents us from doing something silly like a replicated join on a constant (which works but clutters the logic) or, worse, a cross.

Next, in lines 32-39, we're computing a portion of the gradient of the mse. The reason for the Zip udf in the first step was so the nested projection and sum to compute the dot product of the weights with the features works out cleanly.

Then, on line 42, the full gradient of the mse is computed by multiplying each feature by the error. It might not seem obvious when written like that, but the partial derivatives of the mse with respect to each weight make it work out like this. How nice.

Lines 47-54 is where the action happens. By action I mean we'll actually trigger a reduce job since everything up to this point has been map only. This is where the partial derivative bits come together. That is, we're grouping by dimension and weight (which, it turns out is precisely the thing we're differentiating with respect to) and summing each feature vector's contribution to the partial derivative along that dimension. The factor bit, on line 48, is the multiplier for the gradient. It includes the normalization term (we normalize by the number of features since we're differentiating the mean squared error) and the step size. The result of this step is a new weight for each dimension.

An important thing to note here is that there are only a number of partitions equivalent to the number of features. What this means is that each reduce task would get a potentially very large amount of data. Fortunately for us COUNT and SUM are both algebraic and so Pig will use combiners, hurray!, drastically reducing the amount of data sent to each reduce task.

Finally, on lines 60-65, we reconstruct a new tuple with the new weights and return it. The schema of this tuple should be the same as the input weight tuple.

gory details

So now that we have a way to do it, we should do it. Right, I mean if we can we should. Isn't that how technology works...

I'll going to go ahead and do a completely contrived example. The reason is so that I can visualize the results.

I've created some data, called data.tsv, which satisfies the following:
\begin{eqnarray*}y=0.3r(x) + 2x\end{eqnarray*}
where \(r(x)\) is a noise term. And here's the plot:


So we have two feature columns, (1.0, \(x\)), that we're trying to find the weights for. Since we've cooked this example (we already know the relationship between \(x\) and \(y\)) we expect the weights for those columns to be (0.0,2.0) if all goes well.

Now that we've got some data to work with, we'll need to write a bit more pig to load the data up and run it through our gradient descent macro.


There's really not much exciting going on here. We're loading the features and weights and rearranging them to satisfy the schema that the gradient descent macro expects. The other details are related to the driver script.

The driver script is next. Since our algorithm is iterative and pig itself has no support for iteration, we're going to embed the pig script into a python (jython) program. Here's what that looks like:

There's a lot going on here (it's cluttered looking because Pig doesn't allow class or function definitions in driver scripts) that's not that interesting and pretty easy to understand so I'll just go over the high points:
  • We initialize the weights randomly and write a .pig_schema file as well. The reason for writing the schema file is so that it's unnecessary to write the redundant schema for weights in the pig script itself.
  • We want the driver to be agnostic to whether we're running in local mode or mapreduce mode. Thus we copy the weights to the filesystem using the copyFromLocal fs command. In local mode this just puts the initial weights on the local fs whereas in mapreduce mode this'll place them on the hdfs.
  • Then we iterate until the convergence criteria is met. Each iteration details like copying the schema, and pulling down the weights to compute distances is done.
  • A moving average is maintained over the past 25 weights. Iteration stops when the new weight is less than EPS away from the average.
Aside from that there's an interesting -bug?- that comes up as a result of this. Notice on line 70 how the new variables are bound each iteration? Well the underlying PigContext object just keeps adding these new variables instead of overwriting them. What this means is that, after a couple thousand iterations, depending on your PIG_HEAPSIZE env variable, the driver script will crash from an out of memory error. Yikes.

run it

Finally we get to run it. That part's easy:

$: export PIG_HEAPSIZE=4000
$: pig driver.py fit_line.pig data data.tsv 2
  
That is, we use the pig command to launch the driver program. The arguments to the driver script itself follow. We're running the fit_line.pig script where the data dir (where intermediate weights will go) exists under 'data' and the input data, data.tsv, should exist there as well. The '2' indicates we've got two weights (w0, and w1). The pig heapsize is to deal with the bug mentioned in the previous section.

On my laptop, in local mode, the convergence criteria was met after 1527 iterations.

results


After 1527 iterations the weights ended up as (0.0040666759502969215, 2.0068029003414014) which is exactly what we'd expect. In other words:
\begin{eqnarray*}y=0.0041 + 2.0068x\end{eqnarray*}
which is the 'best fit' line to our original:
\begin{eqnarray*}y=0.3r(x) + 2x\end{eqnarray*}

And here's an illustration of what happened.


Looks pretty reasonable to me. Hurray. Now go fit some real data.

Monday, September 9, 2013

Get Pig LogicalPlan

Recently I've been wanting to get ahold of the logical plan (a graph representation) for a pig script without running it. The largest reason is that the logical plan is a fairly language and platform agnostic representation of a dataflow. Once you have the logical plan I can think of several fun things you could do with it:


  • Serialize it as JSON and send it to any number of arbitrary tools
  • Visualize it in a web browser
  • Edit it with a web app
  • Compile it into an execution (physical) plan for arbitrary (non-hadoop map-reduce) backend frameworks that make sense (storm, s4, spark) 
Ok, so maybe those are the fun things I actually plan on doing with it, but what's the difference?

Problem

Pig doesn't make it easy to get this. After spending several hours digging through the way pig parses and runs a pig script I've come away somewhat shaken up. The parsing logic is deeply coupled with the execution logic. Yes, yes, this is supposed to change as we go forward, eg PIG-3419, but what about in the mean time?

Hack/Solution

So, I've written this little jruby script to return the LogicalPlan for a pig script. Right now all it does is exactly the same as putting an 'EXPLAIN' operator in your script. However, since it exposes the LogicalPlan, you could easily extend this to do whatever you like with it.


Monday, August 12, 2013

Using Hadoop to Explore Chaos

Hadoop. Hadoop has managed to insinuate itself into practically every company with an engineering team and some data. If your company isn't using it, you know a company that is. Hell, it's why you're reading this to begin with. That being said, what you're probably doing with Hadoop is boring and uninspired. It's not your fault of course. Pretty much every example out there pigeonholes Hadoop into default business use cases like etl and data cleaning, basic statistics, machine learning, and GIS.

You know what though? Sometimes it's good to explore things that don't have an obvious business use case. Things that are weird. Things that are pretty. Things that are ridiculous. Things like dynamical systems and chaos. And, if you happen to find there are applicable tidbits along the way (*hint, skip to the problem outline section*), great, otherwise just enjoy the diversion.

motivation


So what is a dynamical system? Dryly, a dynamical system is a fixed rule to describe how a point moves through geometric space over time. Pretty much everything that is interesting can be modeled as a dynamical system. Population, traffic flows, fireflies, and neurons can all be describe this way.

In most cases, you'll have a system of ordinary differential equations like this:

\begin{eqnarray*} \dot{x_{1}} & = & f_{1}(x_{1},\ldots,x_{n})\\ \vdots\\ \dot{x_{n}} & = & f_{n}(x_{1},\ldots,x_{n}) \end{eqnarray*}

For example, the Fitzhugh-Nagumo model (which models a biological neuron being zapped by an external current):

\begin{eqnarray*} \dot{v} & = & v-\frac{v^{3}}{3}-w+I_{{\rm ext}}\\ \dot{w} & = & 0.08(v+0.7-0.8w) \end{eqnarray*}

In this case \(v\) represents the potential difference between the inside of the neuron and the outside (membrane potential), and \(w\) corresponds to how the neuron recovers after it fires. There's also an external current \(I_{{\rm ext}}\) which can model other neurons zapping the one we're looking at but could just as easily be any other source of current like a car battery. The numerical constants in the system are experimentally derived from looking at how giant squid axons behave. Basically, these guys in the 60's were zapping giant squid brains for science. Understand a bit more why I think your business use case is boring?

One of the simple ways you can study a dynamical system is to see how it behaves for a wide variety of parameter values. In the Fitzhugh-Nagumo case the only real parameter is the external current \(I_{{\rm ext}}\). For example, for what values of \(I_{{\rm ext}}\) does the system behave normally? For what values does it fire like crazy? Can I zap it so much that it stops firing altogether?

In order to do that you'd just decide on some reasonable range of currents, say \((0,1)\), break that range into some number of points, and simulate the system while changing the value of \(I_{{\rm ext}}\) each time.

chaos


There's a a lot of great ways to summarize the behavior of a dynamical system if you can simulate its trajectories. Simulated trajectories are, after all, just data sets. The way I'm going to focus on is calculation of the largest lyapunov exponent. Basically, all the lyapunov exponent says is, if I take two identical systems and start them going at slightly different places, how similarly do they behave? 

For example, If I hook a car battery to two identical squid neurons at the same time, but one has a little bit of extra charge on it, does their firing stay in sync forever or do they start to diverge in time? The lyapunov exponent would measure the rate at which they diverge. If the two neurons fire close in time but don't totally sync up then the lyapunov exponent would be zero. If they eventually start firing at the same time then the lyapunov exponent is negative (they're not diverging, they're coming together). Finally, if they continually diverge from one another then the lyapunov exponent is positive.

As it turns out, a positive lyapunov exponent usually means the system is chaotic. No matter how close two points start out, they will diverge exponentially. What this means in practice is that, while I might have a predictive model (as a dynamical system) of something really cool like a hurricane, I simply can't measure it precisely enough to make a good prediction of where it's going to go. A really small measurement error, between where the hurricane actually is and where I measure it to be, will diverge exponentially. So my model will predict the hurricane heading into Texas when it actually heads into Louisanna. Yep. Chaos indeed.

problem outline


So I'm going to compute the lyapunov exponent of a dynamical system for some range of parameter values. The system I'm going to use is the Henon Map:
\begin{eqnarray*}x_{n+1} & = & y_{n}+1-ax_{n}^{2}\\y_{n+1} & = & bx_{n}\end{eqnarray*}
I choose the Henon map for a few reasons despite the fact that it isn't modeling a physical system. One, it's super simple and doesn't involve time at all. Two, it's two dimensional so it's easy to plot it and take a look at it. Finally, it's only got two parameters meaning the range of parameter values will make up a plane (and not some n-dimensional hyperspace) so I can make a pretty picture.

What does Hadoop have to do with all this anyway? Well, I've got to break the parameter plane (ab-plane) into a set of coordinates and run one simulation per coordinate. Say I let \(a=[a_{min},a_{max}]\) and \(b=[b_{min},b_{max}]\) and I want to look \(N\) unique \(a\) values and \(M\) unique \(b\) values. That means I have to run \(N \times M\) individual simulations!

Clearly, the situation gets even worse if I have more parameters (a.k.a a realistic system). However, since each simulation is independent of all the other simulations, I can benefit dramatically from simple parallelization. And that, my friends, is what Hadoop does best. It makes parallelization trivially simple. It handles all those nasty details (which distract from the actual problem at hand) like what machine gets what tasks, what to do about failed tasks, reporting, logging, and the whole bit.

So here's the rough idea:


  1. Use Hadoop to split the n-dimensional (2D for this trivial example) space into several tiles that will be processed in parallel
  2. Each split of the space is just a set of parameter values. Use these parameter values to run a simulation.
  3. Calculate the lyapunov exponent resulting from each.
  4. Slice the results, visualize, and analyze further (perhaps at higher resolution on a smaller region of parameter space), to understand under what conditions the system is chaotic. In the simple Henon map case I'll make a 2D image to look at.
The important silly detail is this. The input data here is minuscule in comparison to most data sets handled with Hadoop. This is NOT big data. Instead, the input data is a small file with n lines and can be thought of as a "spatial specification". It is the input format that explodes the spatial specification into the many individual tiles needed. In other words, Hadoop is not just for big data, it can be used for massively parallel scientific computing.
  

implementation


Hadoop has been around for a while now. So when I implement something with Hadoop you can be sure I'm not going to sit down and write a java map-reduce program. Instead, I'll use Pig and custom functions for pig to hijack the Hadoop input format functionality. Expanding the rough idea in the outline above:

  1. Pig will load a spatial specification file that defines the extent of the space to explore and with what granularity to explore it.
  2. A custom Pig LoadFunc will use the specification to create individual input splits for each tile of the space to explore. For less parallelism than one input split per tile it's possible to specify the number of total splits. In this case the tiles will be split mostly evenly among the input splits.
  3. The LoadFunc overrides Hadoop classes. Specifically: InputFormat (which does the work of expanding the space), InputSplit (which represents the set of one or more spatial tiles), and RecordReader (for deserializing the splits into useful tiles).
  4. A custom EvalFunc will take the tuple representing a tile from the LoadFunc and use its values as parameters in simulating the system and computing the lyapunov exponent. The lyapunov exponent is the result.
And here is the pig script:

define LyapunovForHenon sounder.pig.chaos.LyapunovForHenon();

points = load 'data/space_spec' using sounder.pig.points.RectangularSpaceLoader();

exponents = foreach points generate $0 as a, $1 as b, LyapunovForHenon($0, $1);         
 
store exponents into 'data/result';

You can take a look at the detailed implementations of each component on github. See: LyapunovForHenon, RectangularSpaceLoader

running


I want to explore the Henon map over a range where it's likely to be bounded (unbounded solutions aren't that interesting) and chaotic. Here's my input file:

$: cat data/space_spec
0.6,1.6,800
-1.0,1.0,800

Remember the system?
\begin{eqnarray*}x_{n+1} & = & y_{n}+1-ax_{n}^{2}\\y_{n+1} & = & bx_{n}\end{eqnarray*} Well, the spatial specification says (if I let the first line represent \(a\) and the second be \(b\)) that I'm looking at an \(800 \times 800\) (or 640000 independent simulations) grid in the ab-plane where \(a=[0.6,1.6]\) and \(b=[-1.0,1.0]\)

Now, these bounds aren't arbitrary. The Henon attractor that most are familiar with (if you're familiar with chaos and strange attractors in the least bit) occurs when \(a=1.4\) and \(b=0.3\). I want to ensure I'm at least going over that case.

result


With that, I just need to run it and visualize the results.


$: cat data/result/part-m* | head
0.6	-1.0	9.132244649409043E-5
0.6	-0.9974968710888611	-0.0012539625419929572
0.6	-0.9949937421777222	-0.0025074937591903013
0.6	-0.9924906132665833	-0.0037665150764570965
0.6	-0.9899874843554444	-0.005032402237514987
0.6	-0.9874843554443055	-0.006299127065420516
0.6	-0.9849812265331666	-0.007566751054452304
0.6	-0.9824780976220276	-0.008838119048229768
0.6	-0.9799749687108887	-0.010113503950504331
0.6	-0.9774718397997498	-0.011392710785045064
$: cat data/result/part-m* > data/henon-lyapunov-ab-plane.tsv

To visualize I used this simple python script to get:


The big swaths of flat white are regions where the system becomes unbounded. It's interesting that the bottom right portion has some structure to it that's possibly fractal. The top right portion, between \(b=0.0\) and \(b=0.5\) and \(a=1.0\) to \(a=1.6\) is really the only region on this image that's chaotic (where the exponent is non-negative and greater than zero). There's a lot more structure here to look at but I'll leave that to you. As a followup it'd be cool to zoom in on the bottom right corner and run this again.

conclusion


So yes, it's possible to use Hadoop to do massively parallel scientific computing and avoid the question of big data entirely. Best of all it's easy. 

The notion of exploding a space and doing something with each tile in parallel is actually pretty general and, as I've shown, super easy to do with Hadoop. I'll leave it to you to come up with your own way of applying it.

Monday, October 8, 2012

K-D Tree Generation With Apache Pig

An idea that's been rolling around in my head for a while now has been how to create a k-d tree with pig. I've been hesitant to post about it because it seemed like a ridiculous idea. That is, a k-d tree is typically generated recursively. So how can you parallelize its creation without reverting to some complicated iterative solution? Turns out it's an important topic in graphics rendering, particularly with respect to ray tracing, and some smart folks in that field have a pretty good parallel solution. Since we're here to talk about pig, and not graphics rendering, I'll go ahead and skip to the juicy bit. The idea is to first break the space your points lie within into non-overlapping partitions. A non-parallel k-d tree algorithm is then applied to each of the partitions in parallel. The resulting trees, one per partition, are then merged to create the final tree. That's it. Seems obvious in hindsight.

motivation

First off, why bother pigify-ing the creation of a k-d tree? Pig is pretty much limited to problems where high latency isn't a concern and so unless the set of things we're planning on indexing is huge it doesn't make much sense to talk about it other than as a theoretical exercise. Well. Let's see...

  • Suppose you wanted to create your own index of star brightness observations (of which this meager database has over 20 million). Given a set of galactic coordinates and time, what observations lie nearby in space and time? Maybe a k-d tree would help!
  • Pssh, star brightness observations, that's basically only 4 dimensions, what about indexing document vectors for all the web pages in the Common Crawl data set? You could try a k-d tree.
  • What about satellite data? There's an interesting (and old) project called the International Cloud Climatology Project (ISCCP). Want to know cloud properties for a region over time? Well, one of their data sets has broken the world into a grid of 30km x 30km 'pixels' and reports all sorts of interesting cloud properties for each those pixels for over 30 years! It's about 5GB of data per month... Yikes. I'm going to reach for a k-d tree to search through that beast.

problem outline

So now that we're sufficiently motivated (and come on, if stars, web pages, and clouds don't get you going then what the hell are you doing here?), let's work through a much smaller example that we can actually look at. I find with problems involving points it's most intuitive to use geo data as a starting point. Geo data is easy to think about and you can plot your results on a map for verification.

The example geo data we'll use is from the Geonames data base and is simply a list of all the cities in the world with population greater than 1000. We're going to use pig create a k-d tree containing these cities. Some ways of storing and querying this k-d tree will be discussed.  

get data

Go ahead and download and unzip the data so you too can play along. I like wget.

$: wget http://download.geonames.org/export/dump/cities1000.zip
$: uzip cities1000.zip

This will result in a tab-separated-values file called 'cities1000.txt'. In the following analysis we're only using the coordinates and the ids of a point. You could write your pig script to load all the fields and project out the ones you don't want or if you don't like writing gory pig schemas like me, you can cut them out ahead of time on the command line.

$: cut -f1,5,6 cities1000.txt > cities1000_cut.tsv

Where fields 1, 5, and 6 (in the cut command above) correspond to the geonameid, latitude, and longitude respectively.

algorithm

Before we can write a pig script to actually deal with this we'd better have our algorithm solidly in hand. Here it is to the best of my ability to write it...
  1. Break the space into non-overlapping partitions.
  2. Run a non-parallel k-d tree generation algorithm on the points that fall within each partition.
  3. Merge the k-d trees from (2) into the result k-d tree

Non-overlapping partitions wtf?

Basically we need to break the space our points fall inside into smaller pieces that don't overlap. This is important for merging the trees in the final step since, with non-overlapping pieces, a point can only belong to one partition and, consequently, exactly one tree. If a point was inside more than one partition then we'd have a heck of a time merging the trees from those partitions since there'd be collisions. No fun.

Now, which algorithm do we actually use to partition? It can get complicated since it really depends on the performance you're looking for and the distribution of your data. You don't want to send too many points to a single partition. On the other hand, processing empty or mostly empty partitions really sucks too. In the Shevtsov et al. paper I referenced at the beginning of this post they run an initial clustering algorithm over the points and choose how to partition the space dynamically based on that clustering. However, since this is a blog post and I am but one man, we'll use an ok way of partitioning based on the quadkey tiling scheme described here. This basically just chops the space up into an even grid.

Ok, so now that I'm using tiles, how do I know what size tile to use? The quadkey system allows you to use a zoom level that ranges from 1 to 23. To pick the best one for this example I sampled the cities and generated a distribution of the number of points per tile at various resolutions:

Eyeballing it, it looks like 7 is a good zoom level at which to partition the space. Mostly it's because the distribution for 7 has an alright spread and I don't see hot spots where one partition gets waaay to many points like at levels 5 and 6. But, please, by all means, experiment with this.

Since we're dealing with geodata and I'm in a visualization mood, here's what Texas looks like partitioned at zoom level 7. The points are the actual cities pulled from our example data set:

As you can see, what with West Texas being a wasteland and all, the partitioning makes it so that some partitions get too much data and some get very little. Marfa's got dust and Dallas has all the suburbs. You just can't win with a grid. But, we're going to march forward anyhow.

non-parallel k-d tree

Since we're going to be using pig the k-d tree generation code will need to be written as a user-defined-function (UDF). Pig is going to group the points by their quadkey which will result in a bag, one per quadkey (partition). Hence, the UDF will need to take a bag of points as input.

Here's also where it's important to think about how we're going to represent the k-d tree during the pig processing. A nested tree representation doesn't work because we need to operate on individual nodes from the pig script itself (filtering, etc). So, the UDF will yield a bag of points, exactly the same size as the input, only with the left and right children (from the k-d tree) attached as fields. Let's call the UDF "KDTree". Rather than paste a couple hundred lines of java, an implementation can be found in the sounder repo on github here. Here's the important bits:

udf signature: KDTree(points) where points is a bag with the following schema:

  points:bag{
    point:tuple(
      id:     chararray, 
      coords: tuple(
        lng:double, lat:double
      )
    )
  }

output schema: The output is a bag, with the same size as the input, with the following schema:

  nodes:bag{
    node:tuple(
      id:          chararray, 
      is_root:     int,
      axis:        int,
      above_child: chararray, 
      below_child: chararray, 
      coords:      tuple(
        lng:double, lat:double
      )
    )
  }

A few things. In the output, above_child and below_child correspond to the right and left children of the point. They're labeled as such since the k-d tree udf can operate on k-dimensional points, not just 2-dimensional points. is_root is an integer, 0 or 1, indicating whether the node is the root of the k-d tree or not. This is because we're returning the nodes as a completely flat structure and we need to be able to reconstruct our tree. Also, in the next step of the algorithm, we're going to need to separate the roots of each partition from the branches. axis is the axis that this node splits (either 0 or 1). This is so that, once we reconstruct our tree, we know how to search it.

merging trees

This part seems complicated but it's actually pretty simple. Remember, it's only possible because we broke our original space into non-overlapping pieces. In the previous step we've generated a k-d tree for each of those pieces. As a result of this each partition has its own root node. To merge the trees we first need to nominate one of those roots (say, the median point along the longitude axis) as the top level root. Then, taking the top level root's tree to start we simply insert the other roots into this tree. Importantly, this step cannot be done (as far as I can tell) in parallel. Instead, it relies on how we ultimately choose to store (and query) the tree. I'll describe a simple way using a hashmap in a bit which can easily be extended to work with a distributed key-value store.

Implementation

Implementing this in pig is actually pretty straightforward. I'll go over each part in turn.

First, as a convenience and to make the pig code more readable, we'll use the define keyword in to alias our udfs. GetQuadKey is the udf that generates the quadkeys and KDTree is the udf to generate a k-d tree from a bag of points.

define GetQuadkey sounder.pig.geo.GetQuadkey();
define KDTree     sounder.pig.points.KDTree();

Next, we simply load the data using pig's default PigStorage class with tabs as the field delimiter.


data = load '$POINTS' as (id:chararray, lat:double, lng:double);
Then we generate a quadkey for every point at resolution 7. We're also using pig's built in TOTUPLE operator since the k-d tree udf needs points to be specified as tuples of coordinates. This is so it can handle k-dimensional points and not just 2-dimensional points.

with_key = foreach data generate id, TOTUPLE(lng, lat) as point, GetQuadkey(lng, lat, 7) as quadkey;
And here's the meat of it where we group by quadkey (effectively using hadoop to partition our space) which collects all the points for a given partition into a single bag. This is fed to the k-d tree udf which does the work of creating the k-d trees.

trees = foreach (group with_key by quadkey) {
          tree = KDTree(with_key.(id, point));
          generate
            group as quadkey,
            FLATTEN(tree) as (id:chararray, is_root:int, axis:int, above_child:chararray, below_child:chararray, point:tuple(lng:double, lat:double));
        };

Finally, the trees are split into roots (remember, each partition will have its own root) and branches. These are flattened to pull the coordinates out of their tuple containers and stored separately. The roots and branches need to be dealt with differently when creating the final k-d tree so they're stored separately to reflect that.



flat_trees = foreach trees generate id..below_child, flatten(point) as (lng, lat);
split flat_trees into roots if is_root==1, branches if is_root==0;

store roots    into '$ROOTS';
store branches into '$BRANCHES';

Running

So let's go ahead and run that on our cities data. I saved the pig code above into a file called 'generate_kdtree.pig'. To run:

$: pig -p POINTS=cities1000_cut.tsv -p ROOTS=cities1000/roots -p BRANCHES=cities1000/branches generate_kdtree.pig

This should generate only one map-reduce job and result in a directory on the hdfs which contains the roots and branches of the k-d trees.

As a sanity check, here's an image of what texas looks like, now with the roots:


Since the first splitting axis in the k-d tree algorithm is the x-axis (longitude) we expect the root of each k-d tree to be the median point along that axis in its respective partition. By eyeballing every partition (I sure do a lot of eyeballing...) this appears to be true. So yay, looks good.

Also, notice I'm getting ahead of myself a bit and showing the root I've nominated as the top level node.

Final Tree

This part takes some care. First, we need to decide how we're actually going to store and query the k-d tree. Directly from the hdfs in its current form is definitely not going to work. For illustrative purposes I'm going to use a ruby hash to store and query the tree. This, of course, only works because the data is small enough to fit into memory. However, most distributed key-value stores and other, more complex nosql stores like cassandra and hbase, would work as well. For the purposes of this example you can think of them as giant ruby hashmaps too. The same basic query interface would work, just with a different data store backend.

As steps:

  1. Insert all the branches into the key-value store using the point id as the key and the rest of the point metadata as the value. Importantly the coordinates and the above_child and below_child are stored. How you do this is up to you.
  2. Nominate one of the roots as the top level root. Ideally this will be the median point along either the x or y axis. The idea here is that the number of partitions, thus the number of roots, will be small enough that finding the median point and nominating it as the root should be a simple single process problem.
  3. Insert the remaining roots into the key-value store using the standard k-d tree insertion procedure outlined in the wikipedia article
Of course you'll have to implement some basic k-d tree searching and insertion code, but that's a well understood problem. There are plenty of examples, eg here and here. My implementation can be found in the sounder repo on github here.

To use my code simply bring the cities1000/roots and cities1000/branches down to your local filesystem and run:

$: bin/test_built_tree.rb roots branches                                    
0.00194469216500055
#&lt struct KDTree::Node id="4671654", axis=1, left="4698809", right="4723734", coords=[-97.74306, 30.26715]&gt

$: grep -i "4671654" cities1000.txt
4671654 Austin Austin AUS,Austin,Austino,Austinopolis,Aŭstino,Gorad Oscin,Ostin,Ostina,Ostinas,ao si ting,astin,astyn, tgzas,awstn, tksas,oseutin,ostina,osutin,xxstin,Ώστιν,Горад Осцін,Остин,Остін,Օսթին,אוסטין,آستین، تگزاس,آسٹن,آسٹن، ٹیکساس,أوستن، تكساس,ऑस्टिन,ஆஸ்டின்,ഓസ്റ്റിൻ,ออสติน,ოსტინი,オースティン,奧斯汀,오스틴 30.26715 -97.74306 P PPLA US  TX 453   790390 149 165 America/Chicago 2011-05-14

and should return Austin, the nearest city to my house. Yes.

One test. Yes, surely that's all you need to be convinced, right?

So it works. Hurray! k-d trees with pig. 

Wednesday, March 21, 2012

Topic Discovery With Apache Pig and Mallet

A common desire when working with natural language is topic discovery. That is, given a set of documents (eg. tweets, blog posts, emails) you would like to discover the topics inherent in those documents. Often this method is used to summarize a large corpus of text so it can be quickly understood what that text is 'about'. You can go further and use topic discovery as a way to classify new documents or to group and organize the documents you've done topic discovery on.

Latent Dirichlet Allocation



One popular method for topic discovery in a corpus is Latent Dirichlet Allocation (LDA). I won't pretend to be an expert on LDA but the main assumption is as follows. Each document is assumed to be a 'mixture' of topics. Going further, each topic is then assumed to be a distribution over terms. For example, say there is a topic in my corpus labeled 'apache hadoop'. It could be represented as a multinomial probability distribution with high probability of generating terms such as 'hadoop', 'data', 'apache', and 'map-reduce'. See the wikipedia article on LDA

Problem


I'm going to use Apache Pig and Mallet, a java based machine learning and natural language processing library to discover topics in the 20 newsgroups data set. This corpus is nice since each document already belongs to a newsgroup (a topic) and so it gives us a way of checking how well our topic discovery is doing.

The Data


The 20 newsgroups data set can be found on Infochimps here. Once you've got the data go ahead and place it somewhere on your hdfs. I put mine in my home directory under '20newsgroups/data'.

Here's what a head of that data looks like:


alt.atheism.53536 From: kmr4@po.CWRU.edu (Keith M. Ryan) Subject: Re: Smullyanism for the day..... In article ... *snip*
alt.atheism.51164 From: mccullou@snake2.cs.wisc.edu (Mark McCullough) Subject: Re: Idle questions for fellow atheists In article ... *snip*
alt.atheism.53448 From: sandvik@newton.apple.com (Kent Sandvik) Subject: Age of Reason Was: ... *snip*
alt.atheism.53753 Subject: Re: The Inimitable Rushdie From: kmagnacca@eagle.wesleyan.edu In article ... *snip*
alt.atheism.53290 From: Andrew Newell Subject: Re: Christian Morality is In article ... *snip*


It's a little messy but it should get the point across.

So it's tab separated where the first field is the document id (a concatenation of the newsgroup the document is coming from and an integer id). The second field is the document text itself. Here's the pig schema for that:


(doc_id:chararray, text:chararray)


Algorithm


LDA operates on a set of documents. Trivially we could just skip the pig part and write a simple java program that operates on the entire document set and be done with it. But that's not the point. Typically, your input documents have metadata attached to them. For example, the region or user they're coming from, or even just the date they were generated. So we'll just use pig's GROUP BY statement to group the documents by this metadata and cluster the documents within each group independently. If the documents don't have this kind of metadata we're stuck doing a GROUP ALL and dealing with all the documents at once. There are clever ways of parallelizing LDA in this case that I'm not going to go into. See here and here.

Here's a sketch of the algorithm:


  • (1) Load documents

  • (2) Group the documents by appropriate metadata (or by all)

  • (3) Run LDA on each group of documents

  • (4) Profit!!!



Implementation


So it's clear that we're going to need a java udf to do the actual topic clustering. Right? This udf will operate on a DataBag of documents and return a DataBag containing the discovered topics. Each topic will be represented by a Tuple with the following schema:


(topic:tuple(topic_num:int, terms:bag{t:tuple(term:chararray, weight:double)}))


Each topic has a DataBag of top terms associated with it as a way of characterizing the topic discovered.

Here's the simplest implementation of such a udf I could come up with using Mallet (it's a mouthful):


package varaha.topic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.regex.Pattern;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.backend.executionengine.ExecException;

import cc.mallet.pipe.Pipe;
import cc.mallet.pipe.CharSequenceLowercase;
import cc.mallet.pipe.CharSequence2TokenSequence;
import cc.mallet.pipe.CharSequence2CharNGrams;
import cc.mallet.pipe.TokenSequenceNGrams;
import cc.mallet.pipe.TokenSequenceRemoveStopwords;
import cc.mallet.pipe.TokenSequence2FeatureSequence;
import cc.mallet.pipe.TokenSequence2FeatureSequenceWithBigrams;

import cc.mallet.types.TokenSequence;
import cc.mallet.types.Token;

import cc.mallet.pipe.SerialPipes;
import cc.mallet.types.InstanceList;
import cc.mallet.types.Instance;
import cc.mallet.types.Alphabet;
import cc.mallet.types.IDSorter;
import cc.mallet.types.LabelSequence;
import cc.mallet.topics.TopicAssignment;
import cc.mallet.topics.ParallelTopicModel;

public class LDATopics extends EvalFunc<DataBag> {

private Pipe pipe;
private static Long numKeywords = 50l; // Maximum number of keywords to use to describe a topic

public LDATopics() {
pipe = buildPipe();
}

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
return null;

Integer numTopics = (Integer)input.get(0); // Number of topics to discover
DataBag documents = (DataBag)input.get(1); // Documents, {(doc_id, text)}
DataBag result = BagFactory.getInstance().newDefaultBag();

InstanceList instances = new InstanceList(pipe);

// Add the input databag as source data and run it through the pipe built
// by the constructor.
instances.addThruPipe(new DataBagSourceIterator(documents));

// Create a model with numTopics, alpha_t = 0.01, beta_w = 0.01
// Note that the first parameter is passed as the sum over topics, while
// the second is the parameter for a single dimension of the Dirichlet prior.
ParallelTopicModel model = new ParallelTopicModel(numTopics, 1.0, 0.01);
model.addInstances(instances);
model.setNumThreads(1); // Important, since this is being run in the reduce, just use one thread
model.setTopicDisplay(0,0);
model.setNumIterations(2000);
model.estimate();

// Get the results
Alphabet dataAlphabet = instances.getDataAlphabet();
ArrayList<TopicAssignment> assignments = model.getData();

// Convert the results into comprehensible topics
for (int topicNum = 0; topicNum < model.getNumTopics(); topicNum++) {
TreeSet<IDSorter> sortedWords = model.getSortedWords().get(topicNum);
Iterator<IDSorter> iterator = sortedWords.iterator();

DataBag topic = BagFactory.getInstance().newDefaultBag();

// Get the set of keywords with weights for this topic and add them as tuples
// to the databag used to represent this topic
while (iterator.hasNext() && topic.size() < numKeywords) {
IDSorter info = iterator.next();
Tuple weightedWord = TupleFactory.getInstance().newTuple(2);
String wordToken = model.alphabet.lookupObject(info.getID()).toString(); // get the actual term text
weightedWord.set(0, wordToken);
weightedWord.set(1, info.getWeight()); // the raw weight of the term
topic.add(weightedWord);
}
Tuple topicTuple = TupleFactory.getInstance().newTuple(2);
topicTuple.set(0, topicNum);
topicTuple.set(1, topic);
result.add(topicTuple);
}

return result;
}

// Instantiates a new pipe object for ingesting pig tuples
private Pipe buildPipe() {
Pattern tokenPattern = Pattern.compile("\\S[\\S]+\\S");
int[] sizes = {1,2};
ArrayList pipeList = new ArrayList();

pipeList.add(new CharSequence2TokenSequence(tokenPattern));
pipeList.add(new TokenSequenceRemoveStopwords(false, false)); // we should use a real stop word list
pipeList.add(new TokenSequenceNGramsDelim(sizes, " "));
pipeList.add(new TokenSequence2FeatureSequence());
return new SerialPipes(pipeList);
}

/**
A few minor updates to TokenSequenceNGrams:

(1) use delimiter that's passed in to delineate ngrams
*/
private class TokenSequenceNGramsDelim extends TokenSequenceNGrams {
int [] gramSizes = null;
String delim = null;

public TokenSequenceNGramsDelim(int [] sizes, String delim) {
super(sizes);
this.gramSizes = sizes;
this.delim = delim;
}

@Override
public Instance pipe (Instance carrier) {
String newTerm = null;
TokenSequence tmpTS = new TokenSequence();
TokenSequence ts = (TokenSequence) carrier.getData();

for (int i = 0; i < ts.size(); i++) {
Token t = ts.get(i);
for(int j = 0; j < gramSizes.length; j++) {
int len = gramSizes[j];
if (len <= 0 || len > (i+1)) continue;
if (len == 1) { tmpTS.add(t); continue; }
newTerm = new String(t.getText());
for(int k = 1; k < len; k++)
newTerm = ts.get(i-k).getText() + delim + newTerm;
tmpTS.add(newTerm);
}
}
carrier.setData(tmpTS);
return carrier;
}
}

/**
Allow for a databag to be source data for mallet's clustering
*/
private class DataBagSourceIterator implements Iterator<Instance> {

private Iterator<Tuple> tupleItr;
private String currentId;
private String currentText;

public DataBagSourceIterator(DataBag bag) {
tupleItr = bag.iterator();
}

public boolean hasNext() {
if (tupleItr.hasNext()) {
Tuple t = tupleItr.next();
try {
if (!t.isNull(0) && !t.isNull(1)) {
currentId = t.get(0).toString();
currentText = t.get(1).toString();
if (currentId.isEmpty() || currentText.isEmpty()) {
return false;
} else {
return true;
}
}
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
return false;
}

public Instance next() {
// Get the next tuple and pull out its fields
Instance i = new Instance(currentText, "X", currentId, null);
return i;
}

public void remove() {
tupleItr.remove();
}
}
}


There's a few key things going on here. First, the udf operates on a bag that contains tuples with exactly two fields, doc_id and text. Mallet has the notion of pipes where your input data flows through a set of 'pipes' as a way of preparing the data. The class DataBagSourceIterator is simply a convenient way of plugging a DataBag object into this flow.

One of the pipes our documents flow through actually tokenizes the text. TokenSequenceNGramsDelim does this work. Mallet has a built-in TokenSequenceNGrams that works nicely, unfortunately when tokenizing n-grams where n > 1 it uses an '_' by default to separate the terms in the ngram. TokenSequenceNGramsDelim allows us to use our own delimiter, namely a ' ', instead.

Ultimately, all this udf does is read the input documents, prepare them for clustering, runs Mallet's built in LDA methods, and constructs the output DataBag in the way we'd like it.

See varaha for the code itself plus a pom.xml to compile it.

Pig



Now that we have our udf, let's write a pig script to use it. Since the documents I've chosen to use don't have any obvious (or at least easy to get at) additional metadata we're going to use a GROUP ALL. Here's the pig script:


define TokenizeText varaha.text.TokenizeText();
define LDATopics varaha.topic.LDATopics();
define RangeConcat org.pygmalion.udf.RangeBasedStringConcat('0', ' ');

--
-- Load the docs
--
raw_documents = load '$DOCS' as (doc_id:chararray, text:chararray);

--
-- Tokenize text to remove stopwords
--
tokenized = foreach raw_documents generate doc_id AS doc_id, flatten(TokenizeText(text)) as (token:chararray);

--
-- Concat the text for a given doc with spaces
--
documents = foreach (group tokenized by doc_id) generate group as doc_id, RangeConcat(tokenized.token) as text;

--
-- Ensure all our documents are sane
--
for_lda = filter documents by SIZE(doc_id) > 0 and SIZE(text) > 0;

--
-- Group the docs by all and find topics
--
-- WARNING: This is, in general, not appropriate in a production environment.
-- Instead it is best to group by some piece of metadata which partitions
-- the documents into smaller groups.
--
topics = foreach (group for_lda all) generate
FLATTEN(LDATopics(20, for_lda)) as (
topic_num:int,
keywords:bag {t:tuple(keyword:chararray, weight:int)}
);


store topics into '$OUT';


There's a few things worth pointing out here. First, we load our text as normal. There's a step there to tokenize text which seems like it might be spurious. It uses the lucene tokenization udf from here as a way to remove stopwords. You could skip this step if that wasn't important for you. Next, the tokenized text is grouped back together by document id and concatenated back together to form cleaned documents. I'm using the nice udf from pygmalion to do the concatenation. Finally, the documents are grouped together and topics are discovered.

Run it!


At this point we're ready to run our script. I named this script 'discover_topics_example.pig'. And here's how I ran it:


pig -p DOCS=20newsgroups/data -p OUT=20newsgroups/topics discover_topics_example.pig


And here's what the output looks like:


0 {(max,4523.0),(max max,3266.0),(g9v,1161.0),(b8f,1109.0),(a86,913.0),(g9v g9v,834.0),(145,740.0),(1d9,656.0),(a86 a86,643.0),(b8f b8f,599.0),(34u,512.0),(145 145,449.0),(75u,446.0),(bhj,445.0),(giz,430.0),(2di,414.0),(1d9 1d9,322.0),(2tm,300.0),(7ey,292.0),(2di 2di,247.0),(bxn,240.0),(6ei,215.0),(6um,189.0),(34u 34u,168.0),(75u 75u,162.0),(bhj bhj,150.0),(giz giz,142.0),(air,129.0),(qax,127.0),(b4q,120.0),(okz,116.0),(6um 6um,112.0),(nrhj,112.0),(b8e,109.0),(7kn,104.0),(1eq,102.0),(bxn bxn,99.0),(c8v,99.0),(rlk,99.0),(fyn,97.0),(2tm 2tm,96.0),(b9r,96.0),(3dy,96.0),(7ez,95.0),(1d9l,93.0),(b8g,92.0),(biz,91.0),(7ex,86.0),(7ey 7ey,84.0),(r8f,82.0)}
1 {(medical,480.0),(subject,473.0),(disease,431.0),(health,373.0),(cancer,349.0),(msg,344.0),(patients,331.0),(article,329.0),(food,285.0),(writes,281.0),(treatment,253.0),(hiv,249.0),(doctor,237.0),(gordon,229.0),(medicine,226.0),(aids,222.0),(candida,212.0),(diet,208.0),(yeast,204.0),(banks,198.0),(infection,188.0),(geb,185.0),(research,181.0),(drug,180.0),(pain,177.0),(effects,171.0),(study,170.0),(information,170.0),(patient,162.0),(1993,158.0),(clinical,152.0),(vitamin,148.0),(dyer,148.0),(studies,145.0),(chronic,138.0),(risk,135.0),(symptoms,135.0),(april,132.0),(doctors,131.0),(diseases,129.0),(body,127.0),(newsletter,124.0),(blood,124.0),(weight,123.0),(page,122.0),(syndrome,120.0),(foods,119.0),(cs.pitt.edu,115.0),(volume,114.0),(hicnet,109.0)}
2 {(game,1010.0),(subject,831.0),(team,799.0),(hockey,725.0),(play,544.0),(games,466.0),(nhl,465.0),(writes,453.0),(season,390.0),(article,382.0),(win,372.0),(players,372.0),(period,303.0),(player,298.0),(goal,286.0),(teams,274.0),(cup,251.0),(league,248.0),(playoff,247.0),(pit,244.0),(detroit,236.0),(det,233.0),(espn,232.0),(leafs,228.0),(pittsburgh,228.0),(wings,223.0),(playoffs,220.0),(fans,214.0),(boston,207.0),(series,202.0),(toronto,200.0),(bos,198.0),(pens,198.0),(played,192.0),(blues,191.0),(chi,185.0),(montreal,184.0),(puck,181.0),(goals,178.0),(buffalo,176.0),(bruins,175.0),(devils,172.0),(maynard,171.0),(penguins,170.0),(april,169.0),(division,169.0),(power,168.0),(ice,165.0),(tor,161.0),(flyers,160.0)}
3 {(subject,1528.0),(drive,1442.0),(scsi,1181.0),(card,1178.0),(disk,682.0),(windows,644.0),(system,603.0),(ide,571.0),(bus,542.0),(dos,512.0),(hard,490.0),(modem,481.0),(software,460.0),(mac,450.0),(writes,450.0),(drives,447.0),(controller,420.0),(drivers,420.0),(article,394.0),(video,393.0),(bit,382.0),(memory,373.0),(board,365.0),(computer,356.0),(cards,349.0),(port,345.0),(ram,326.0),(driver,317.0),(disks,317.0),(data,313.0),(sale,311.0),(floppy,301.0),(i'm,294.0),(motherboard,289.0),(speed,286.0),(isa,277.0),(mode,252.0),(chip,250.0),(machine,240.0),(486,237.0),(bios,234.0),(ibm,226.0),(serial,221.0),(gateway,215.0),(run,214.0),(hardware,213.0),(set,211.0),(cache,209.0),(diamond,207.0),(mhz,200.0)}
4 {(gun,1095.0),(writes,942.0),(article,838.0),(subject,756.0),(fbi,647.0),(government,545.0),(fire,510.0),(guns,505.0),(batf,464.0),(waco,456.0),(koresh,437.0),(children,400.0),(atf,371.0),(weapons,345.0),(compound,288.0),(people,277.0),(cdt,251.0),(police,248.0),(clinton,248.0),(firearms,240.0),(control,234.0),(gas,233.0),(crime,228.0),(law,219.0),(federal,212.0),(killed,201.0),(david,193.0),(constitution,187.0),(survivors,180.0),(house,162.0),(hallam,158.0),(assault,155.0),(ranch,154.0),(agents,153.0),(criminals,150.0),(deaths,150.0),(arms,150.0),(davidians,145.0),(warrant,143.0),(started,143.0),(amendment,141.0),(roby,138.0),(burns,138.0),(tanks,137.0),(texas,136.0),(veal,131.0),(armed,131.0),(murder,130.0),(cult,129.0),(rights,129.0)}
5 {(andy,110.0),(kratz,91.0),(semi,84.0),(water,77.0),(uicvm.uic.edu,74.0),(revolver,73.0),(gun,72.0),(auto,70.0),(safety,64.0),(freeman,62.0),(jason,59.0),(ndet_loop.c,54.0),(u28037,50.0),(weapon,50.0),(gang,50.0),(cops,49.0),(ole.cdac.com,49.0),(02p,48.0),(expose,48.0),(mydisplay,43.0),(glock,42.0),(mahan,41.0),(sail.stanford.edu andy,39.0),(sail.stanford.edu,39.0),(andy sail.stanford.edu,37.0),(section,37.0),(firearm,36.0),(mwra,36.0),(ssave,35.0),(phil,35.0),(military,35.0),(trigger,34.0),(cement,33.0),(auto semi,31.0),(shooting,31.0),(jason kratz,30.0),(garrett,29.0),(silence,29.0),(autos,28.0),(dominance,28.0),(semi auto,27.0),(water dept,27.0),(concealed,26.0),(revolvers,25.0),(u28037 uicvm.uic.edu,25.0),(moment silence,25.0),(ordnance,25.0),(atlantic,25.0),(ingres.com,25.0),(item,25.0)}
6 {(god,2686.0),(jesus,1457.0),(bible,909.0),(christian,823.0),(christ,809.0),(church,781.0),(christians,739.0),(subject,719.0),(sin,589.0),(lord,504.0),(god's,494.0),(faith,478.0),(people,464.0),(sandvik,425.0),(life,411.0),(christianity,405.0),(writes,405.0),(paul,367.0),(love,358.0),(law,357.0),(hell,344.0),(heaven,323.0),(truth,313.0),(word,311.0),(athos.rutgers.edu,295.0),(article,295.0),(john,283.0),(catholic,280.0),(scripture,278.0),(father,257.0),(holy,254.0),(spirit,253.0),(son,248.0),(kent,234.0),(true,233.0),(eternal,231.0),(doctrine,231.0),(death,226.0),(brian,223.0),(day,214.0),(apr,211.0),(newton.apple.com,210.0),(jehovah,204.0),(children,204.0),(biblical,203.0),(words,200.0),(book,198.0),(earth,191.0),(jews,190.0),(matthew,190.0)}
7 {(turkish,827.0),(armenian,826.0),(armenians,694.0),(armenia,452.0),(turkey,449.0),(turks,414.0),(war,383.0),(muslim,360.0),(genocide,346.0),(muslims,343.0),(soviet,332.0),(people,310.0),(greek,294.0),(government,268.0),(argic,263.0),(serdar,262.0),(russian,254.0),(jews,248.0),(history,243.0),(azerbaijan,239.0),(world,202.0),(university,201.0),(greece,199.0),(istanbul,193.0),(killed,185.0),(population,184.0),(article,176.0),(ottoman,170.0),(bosnia,169.0),(children,169.0),(soldiers,167.0),(serbs,160.0),(greeks,158.0),(europe,156.0),(1920,148.0),(extermination,147.0),(army,144.0),(zuma.uucp,141.0),(subject,141.0),(1919,134.0),(sera,132.0),(dead,132.0),(troops,130.0),(bosnian,130.0),(mountain,129.0),(closed,128.0),(serve,128.0),(roads,127.0),(empire,127.0),(escape,126.0)}
8 {(writes,525.0),(cramer,460.0),(article,459.0),(homosexual,407.0),(gay,402.0),(homosexuality,384.0),(subject,351.0),(sex,324.0),(sexual,281.0),(clayton,274.0),(health,271.0),(homosexuals,256.0),(optilink.com,248.0),(drugs,240.0),(insurance,216.0),(study,214.0),(drug,202.0),(government,195.0),(male,168.0),(tax,146.0),(kaldis,144.0),(private,134.0),(people,131.0),(percentage,129.0),(child,126.0),(care,120.0),(children,118.0),(clayton cramer,116.0),(cramer optilink.com,108.0),(optilink.com cramer,108.0),(state.edu,108.0),(women,101.0),(kinsey,95.0),(cramer clayton,95.0),(rights,95.0),(population,95.0),(magnus.acs.ohio,94.0),(writes article,88.0),(abortion,84.0),(american,84.0),(partners,83.0),(political,80.0),(gays,78.0),(heterosexual,78.0),(national,78.0),(church,76.0),(evidence,76.0),(issues,75.0),(laws,75.0),(optilink,74.0)}
9 {(subject,898.0),(list,647.0),(mail,581.0),(university,394.0),(address,317.0),(email,278.0),(information,276.0),(mailing,246.0),(send,219.0),(computer,216.0),(1993,210.0),(internet,199.0),(conference,198.0),(research,197.0),(fax,189.0),(systems,159.0),(newsgroup,155.0),(contact,141.0),(message,141.0),(government,130.0),(date,130.0),(apr,130.0),(steve,130.0),(request,125.0),(info,122.0),(phone,121.0),(news,120.0),(canada,118.0),(article,118.0),(br.com,111.0),(science,110.0),(dept,110.0),(writes,110.0),(newsgroups,109.0),(april,107.0),(national,105.0),(graphics,103.0),(book,102.0),(john,102.0),(mailing list,98.0),(institute,98.0),(books,97.0),(addresses,96.0),(organization,96.0),(usa,95.0),(software,95.0),(james,93.0),(reply,92.0),(paul,89.0),(center,88.0)}
10 {(subject,1355.0),(sale,583.0),(writes,375.0),(power,362.0),(battery,316.0),(article,290.0),(radio,242.0),(sound,224.0),(radar,209.0),(phone,208.0),(circuit,199.0),(ground,183.0),(offer,182.0),(shipping,178.0),(condition,178.0),(mail,176.0),(audio,174.0),(tape,173.0),(sell,171.0),(signal,168.0),(current,167.0),(amp,166.0),(price,164.0),(detector,162.0),(system,160.0),(box,155.0),(heat,154.0),(batteries,150.0),(line,148.0),(blue,144.0),(noise,143.0),(output,142.0),(voltage,142.0),(chip,141.0),(stereo,137.0),(games,131.0),(supply,131.0),(equipment,129.0),(cpu,127.0),(john,126.0),(light,122.0),(wire,121.0),(low,121.0),(air,117.0),(input,115.0),(channel,114.0),(car,112.0),(original,109.0),(lead,109.0),(computer,108.0)}
11 {(subject,1177.0),(monitor,490.0),(writes,481.0),(apple,473.0),(mac,388.0),(article,335.0),(printer,296.0),(mouse,279.0),(computer,269.0),(xterm,243.0),(video,240.0),(software,233.0),(centris,226.0),(screen,208.0),(mail,205.0),(i've,203.0),(keyboard,200.0),(system,198.0),(monitors,191.0),(i'm,190.0),(color,181.0),(price,168.0),(running,168.0),(duo,166.0),(quadra,160.0),(bit,158.0),(print,152.0),(machine,151.0),(university,143.0),(internet,139.0),(info,139.0),(email,133.0),(ram,130.0),(vga,126.0),(laser,125.0),(key,122.0),(x11r5,120.0),(speed,120.0),(610,119.0),(box,119.0),(fpu,117.0),(display,116.0),(fax,116.0),(lib,114.0),(powerbook,113.0),(buy,110.0),(deskjet,109.0),(simms,109.0),(lciii,107.0),(hardware,104.0)}
12 {(windows,1782.0),(subject,1779.0),(file,1209.0),(program,863.0),(files,838.0),(window,809.0),(dos,715.0),(image,676.0),(writes,586.0),(graphics,541.0),(article,497.0),(software,441.0),(run,425.0),(display,424.0),(version,422.0),(server,404.0),(code,386.0),(data,371.0),(ftp,369.0),(images,366.0),(unix,364.0),(color,349.0),(bit,347.0),(application,339.0),(i'm,324.0),(manager,323.0),(motif,321.0),(directory,313.0),(format,312.0),(user,302.0),(mail,302.0),(system,296.0),(gif,292.0),(running,288.0),(microsoft,287.0),(screen,284.0),(package,271.0),(faq,264.0),(jpeg,259.0),(i've,251.0),(line,250.0),(memory,247.0),(programs,244.0),(3.1,236.0),(applications,226.0),(set,218.0),(support,201.0),(information,199.0),(source,198.0),(text,195.0)}
13 {(space,1339.0),(subject,597.0),(writes,567.0),(henry,484.0),(launch,443.0),(article,421.0),(moon,376.0),(earth,373.0),(orbit,366.0),(nasa,365.0),(shuttle,323.0),(spacecraft,268.0),(system,265.0),(solar,264.0),(mission,264.0),(satellite,261.0),(pat,254.0),(sky,253.0),(zoo.toronto.edu,222.0),(data,215.0),(science,214.0),(spencer,205.0),(station,203.0),(lunar,195.0),(energy,174.0),(flight,166.0),(cost,165.0),(project,162.0),(hst,161.0),(mars,161.0),(program,161.0),(zoo.toronto.edu henry,159.0),(ray,155.0),(prb,152.0),(henry zoo.toronto.edu,152.0),(billion,151.0),(atmosphere,149.0),(technology,145.0),(baalke,143.0),(april,140.0),(nuclear,139.0),(gamma,138.0),(universe,137.0),(astronomy,133.0),(theory,133.0),(planet,132.0),(sun,131.0),(commercial,130.0),(aurora.alaska.edu,127.0),(nsmca,124.0)}
14 {(subject,1478.0),(car,1469.0),(writes,1381.0),(article,1227.0),(bike,741.0),(cars,481.0),(dod,471.0),(engine,422.0),(ride,306.0),(bmw,292.0),(speed,280.0),(miles,276.0),(oil,267.0),(front,259.0),(drive,254.0),(honda,249.0),(riding,244.0),(road,244.0),(ford,242.0),(i've,240.0),(i'm,228.0),(driving,217.0),(dealer,210.0),(buy,206.0),(insurance,205.0),(bikes,204.0),(dog,198.0),(motorcycle,196.0),(rear,179.0),(price,171.0),(wheel,168.0),(left,164.0),(clutch,162.0),(helmet,157.0),(article writes,155.0),(writes article,154.0),(tires,147.0),(brake,146.0),(mike,143.0),(shaft,140.0),(john,138.0),(don't,138.0),(andrew,135.0),(mustang,133.0),(gas,131.0),(manual,131.0),(bought,129.0),(fast,128.0),(buying,127.0),(bnr.ca,124.0)}
15 {(key,1377.0),(clipper,1002.0),(encryption,857.0),(chip,851.0),(subject,667.0),(government,646.0),(keys,557.0),(writes,507.0),(security,492.0),(public,453.0),(privacy,419.0),(escrow,416.0),(article,410.0),(des,390.0),(system,385.0),(algorithm,375.0),(law,358.0),(phone,339.0),(nsa,332.0),(netcom.com,327.0),(crypto,308.0),(information,307.0),(secure,304.0),(pgp,300.0),(message,285.0),(secret,282.0),(data,276.0),(bit,265.0),(david,258.0),(cryptography,245.0),(enforcement,245.0),(anonymous,240.0),(code,235.0),(technology,229.0),(encrypted,219.0),(wiretap,217.0),(sternlight,209.0),(internet,204.0),(chip clipper,199.0),(clipper chip,199.0),(access,189.0),(agencies,188.0),(communications,188.0),(chips,184.0),(private,182.0),(computer,173.0),(clinton,167.0),(rsa,165.0),(mail,162.0),(administration,155.0)}
16 {(god,963.0),(writes,892.0),(subject,674.0),(article,625.0),(atheists,525.0),(morality,509.0),(religion,497.0),(moral,449.0),(evidence,432.0),(keith,415.0),(science,380.0),(atheism,365.0),(objective,365.0),(belief,319.0),(christian,317.0),(livesey,312.0),(islam,305.0),(argument,288.0),(atheist,287.0),(exist,287.0),(faith,277.0),(existence,274.0),(religious,272.0),(islamic,258.0),(frank,247.0),(mathew,242.0),(jon,235.0),(beliefs,198.0),(reason,191.0),(claim,188.0),(true,187.0),(exists,179.0),(statement,173.0),(christianity,169.0),(bible,169.0),(wrong,166.0),(values,162.0),(theism,160.0),(system,158.0),(universe,157.0),(truth,155.0),(solntze.wpd.sgi.com,152.0),(cobb,149.0),(jaeger,148.0),(scientific,145.0),(rushdie,142.0),(muslim,142.0),(question,140.0),(agree,139.0),(wrote,133.0)}
17 {(don't,6736.0),(people,6088.0),(subject,5058.0),(time,4513.0),(i'm,3901.0),(writes,3872.0),(article,3356.0),(read,1874.0),(can't,1861.0),(question,1855.0),(doesn't,1769.0),(didn't,1683.0),(i've,1672.0),(that's,1653.0),(lot,1401.0),(real,1388.0),(day,1386.0),(post,1360.0),(world,1311.0),(person,1239.0),(you're,1239.0),(true,1214.0),(wrong,1211.0),(isn't,1155.0),(heard,1136.0),(reason,1125.0),(bad,1125.0),(times,1098.0),(called,1096.0),(i'd,1082.0),(idea,1043.0),(found,1043.0),(remember,1032.0),(life,1023.0),(system,1003.0),(makes,983.0),(means,974.0),(call,969.0),(money,965.0),(power,942.0),(free,926.0),(ago,925.0),(based,909.0),(feel,907.0),(hope,887.0),(hard,887.0),(i'll,885.0),(questions,876.0),(matter,875.0),(david,865.0)}
18 {(israel,1036.0),(israeli,740.0),(jews,599.0),(writes,561.0),(arab,515.0),(subject,497.0),(jewish,482.0),(article,461.0),(war,318.0),(arabs,288.0),(policy,242.0),(jake,235.0),(peace,234.0),(palestinian,224.0),(adl,215.0),(center,187.0),(israelis,185.0),(palestinians,177.0),(kuwait,175.0),(anti,169.0),(killed,164.0),(research,164.0),(palestine,158.0),(gaza,152.0),(igc.apc.org,147.0),(civilians,144.0),(soldiers,144.0),(occupied,144.0),(american,142.0),(cpr,141.0),(iran,141.0),(lebanese,138.0),(land,138.0),(virginia.edu,136.0),(bony.com,131.0),(bony1,131.0),(jew,130.0),(government,129.0),(jerusalem,127.0),(lebanon,126.0),(rights,123.0),(clinton,121.0),(israel's,119.0),(adam,119.0),(zionism,118.0),(press,118.0),(country,117.0),(president,116.0),(opinions,114.0),(yassin,113.0)}
19 {(subject,866.0),(writes,741.0),(article,596.0),(game,578.0),(baseball,516.0),(team,462.0),(players,402.0),(games,385.0),(hit,286.0),(runs,264.0),(season,241.0),(morris,227.0),(win,223.0),(league,223.0),(braves,214.0),(michael,201.0),(ball,192.0),(pitching,181.0),(he's,179.0),(player,176.0),(pitcher,171.0),(hitter,170.0),(play,162.0),(georgia,160.0),(average,158.0),(run,155.0),(roger,150.0),(jays,149.0),(hitting,147.0),(sox,146.0),(david,146.0),(cubs,137.0),(home,135.0),(stats,134.0),(fans,133.0),(mike,131.0),(giants,128.0),(fan,127.0),(base,126.0),(batting,125.0),(pitch,125.0),(ai.uga.edu,124.0),(bonds,123.0),(covington,122.0),(jewish,121.0),(time,121.0),(career,120.0),(teams,119.0),(mcovingt,119.0),(smith,117.0)}


Labeling


Now, we'd like to see how well we did. Here's the 20 topics we _know_ should exist:


  • alt.atheism

  • comp.graphics

  • comp.os.ms-windows.misc

  • comp.sys.ibm.pc.hardware

  • comp.sys.mac.hardware

  • comp.windows.x

  • misc.forsale

  • rec.autos

  • rec.motorcycles

  • rec.sport.baseball

  • rec.sport.hockey

  • sci.crypt

  • sci.electronics

  • sci.med

  • sci.space

  • soc.religion.christian

  • talk.politics.guns

  • talk.politics.mideast

  • talk.politics.misc

  • talk.religion.misc



There are a number of methods for labeling topics discovered in this way, (see here), but in the interest of time I'm going to manually match the topics above to the ones discovered. Obviously, 'eyeballing' it isn't appropriate for a production environment...


  • alt.atheism,16

  • comp.graphics,12

  • comp.os.ms-windows.misc,9

  • comp.sys.ibm.pc.hardware,3

  • comp.sys.mac.hardware,11

  • comp.windows.x,0

  • misc.forsale,10

  • rec.autos,14

  • rec.motorcycles,14

  • rec.sport.baseball,19

  • rec.sport.hockey,2

  • sci.crypt,15

  • sci.electronics

  • sci.med,1

  • sci.space,13

  • soc.religion.christian,6

  • talk.politics.guns,5

  • talk.politics.mideast,18,7

  • talk.politics.misc,4

  • talk.religion.misc,8



So, as far as I can tell there are some that map to multiple of the topics discovered and some that don't seem to map to one discovered at all. It's clear there's room for improvement (look at the parameters alpha and beta I'm hardcoding in the topic model for example). But all in all it's pretty good as a first pass. Now go away and find some topics.

Hurray!

Wednesday, October 26, 2011

Nearest Neighbors With Apache Pig and Jruby

Since it's been such a long time since I last posted I thought I'd make this one a bit longer. It really is a condensing of a lot of things I've been working with and thinking about over the past few months.

Nearest Neighbors



The nearest neighbors problem (also known as the post-office problem) is this: Given a point X in some metric space M, assign to it the nearest neighboring point S. In other words, given a residence, assign to it the nearest post office. The K-nearest neighbors problem, which this post addresses, is just a slight generalization of that problem. Instead of just one neighbor we are looking for K neighbors.

Problem



So, we're going to use the geonames data. This is a set of nearly 8 million geo points with names, coordinates, and a bunch of other good stuff, from around the world. We would like to find, for a given point in the geonames set, the 5 nearest points (also in geonames) that are nearest to it. Should be pretty simple yeah?

Get data


The geonames data set 'allCountries.zip' can be downloaded like so:


$: wget http://download.geonames.org/export/dump/allCountries.zip


Prepare data


Since the geonames data set comes as a nice tab-separated-values (.tsv) file already it's just a matter of unzipping the package and placing it on your hdfs (you do have one of those don't you?). Do:


$: unzip allCountries.zip
$: hadoop fs -put allCountries.txt .


to unzip the package and place the tsv file into your home directory on the hadoop distributed file system.

Schema


Oh, and by the way, before we forget, the data from geonames has this pig schema:


geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray


The Algorithm


Now that we have the data we can start to play with it and think about how to solve the problem at hand. Looking at the data (use something like 'head', 'cat', 'cut', etc) we see that there are really only three fields of interest in the data: (geonameid, longitude, and latitude). All the other fields are just nice metadata which we can attach later.

Now, since we're going to be using Apache Pig to solve this problem we need to think a little bit about parallelism. One constraint is that at no time is any one point going to have access to the locations of all the other points. In other words, we will not be storing the full set of points in memory. Besides, it's 8 million points, that's kind of a lot for my poor little machine to handle.

So it's clear (right?) that we're going to have to partition the space in some way. Then, within a partition of the space, we'll need to apply a local version of the nearest neighbors algorithm. That's it really. Map and reduce. Wait, but there's one problem. What happens if we don't find all 5 neighbors for a point in a single partition? Hmmm. Well, the answer is iteration. We'll choose a small partition size to begin with and gradually increase the partition size until either the partition size is too large or all the neighbors have been found. Got it?

Recap:


  • (1) Partition the space

  • (2) Search for nearest neighbors in a single partition

  • (3) If all neighbors have been found, terminate; else increase partition size and repeat (1) and (2)



Implementation


For partitioning the space we're going to use Google quadkeys (http://msdn.microsoft.com/en-us/library/bb259689.aspx) since it's super easy to implement and it partitions the space nicely. This will be a java UDF for Pig that takes a (longitude, latitude, and zoom level) tuple and returns a string quadkey (the partition id).

Here's the actual java code for that. Let's call it "GetQuadkey":


package sounder.pig.geo;

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

/**
See: http://msdn.microsoft.com/en-us/library/bb259689.aspx

A Pig UDF to compute the quadkey string for a given
(longitude, latitude, resolution) tuple.

*/
public class GetQuadkey extends EvalFunc< String> {
private static final int TILE_SIZE = 256;

public String exec(Tuple input) throws IOException {
if (input == null || input.size() < 3 || input.isNull(0) || input.isNull(1) || input.isNull(2))
return null;

Double longitude = (Double)input.get(0);
Double latitude = (Double)input.get(1);
Integer resolution = (Integer)input.get(2);

String quadKey = quadKey(longitude, latitude, resolution);
return quadKey;
}

private static String quadKey(double longitude, double latitude, int resolution) {
int[] pixels = pointToPixels(longitude, latitude, resolution);
int[] tiles = pixelsToTiles(pixels[0], pixels[1]);
return tilesToQuadKey(tiles[0], tiles[1], resolution);
}

/**
Return the pixel X and Y coordinates for the given lat, lng, and resolution.
*/
private static int[] pointToPixels(double longitude, double latitude, int resolution) {
double x = (longitude + 180) / 360;
double sinLatitude = Math.sin(latitude * Math.PI / 180);
double y = 0.5 - Math.log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * Math.PI);

int mapSize = mapSize(resolution);
int[] pixels = {(int) trim(x * mapSize + 0.5, 0, mapSize - 1), (int) trim(y * mapSize + 0.5, 0, mapSize - 1)};
return pixels;
}

/**
Convert from pixel coordinates to tile coordinates.
*/
private static int[] pixelsToTiles(int pixelX, int pixelY) {
int[] tiles = {pixelX / TILE_SIZE, pixelY / TILE_SIZE};
return tiles;
}

/**
Finally, given tile coordinates and a resolution, returns the appropriate quadkey
*/
private static String tilesToQuadKey(int tileX, int tileY, int resolution) {
StringBuilder quadKey = new StringBuilder();
for (int i = resolution; i > 0; i--) {
char digit = '0';
int mask = 1 << (i - 1);
if ((tileX & mask) != 0) {
digit++;
}
if ((tileY & mask) != 0) {
digit++;
digit++;
}
quadKey.append(digit);
}
return quadKey.toString();
}

/**
Ensure input value is within minval and maxval
*/
private static double trim(double n, double minVal, double maxVal) {
return Math.min(Math.max(n, minVal), maxVal);
}

/**
Width of the map, in pixels, at the given resolution
*/
public static int mapSize(int resolution) {
return TILE_SIZE << resolution;
}
}


Next we need to have another java udf that operates on all the points in a partition. Let's call it "NearestNeighbors". Here's a naive implementation of that:


package sounder.pig.geo.nearestneighbors;

import java.io.IOException;
import java.util.PriorityQueue;
import java.util.Iterator;
import java.util.Comparator;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;

public class NearestNeighbors extends EvalFunc< DataBag> {
private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
return null;

Long k = (Long)input.get(0);
DataBag points = (DataBag)input.get(1); // {(id,lng,lat,{(n1,n1_dist)...})}
DataBag result = bagFactory.newDefaultBag();

for (Tuple pointA : points) {
DataBag neighborsBag = (DataBag)pointA.get(3);
if (neighborsBag.size() < k) {
PriorityQueue< Tuple> neighbors = toDistanceSortedQueue(k.intValue(), neighborsBag);
Double x1 = Math.toRadians((Double)pointA.get(1));
Double y1 = Math.toRadians((Double)pointA.get(2));

for (Tuple pointB : points) {
if (pointA!=pointB) {
Double x2 = Math.toRadians((Double)pointB.get(1));
Double y2 = Math.toRadians((Double)pointB.get(2));
Double distance = haversineDistance(x1,y1,x2,y2);

// Add this point as a neighbor if pointA has no neighbors
if (neighbors.size()==0) {
Tuple newNeighbor = tupleFactory.newTuple(2);
newNeighbor.set(0, pointB.get(0));
newNeighbor.set(1, distance);
neighbors.add(newNeighbor);
}

Tuple furthestNeighbor = neighbors.peek();
Double neighborDist = (Double)furthestNeighbor.get(1);
if (distance < neighborDist) {
Tuple newNeighbor = tupleFactory.newTuple(2);
newNeighbor.set(0, pointB.get(0));
newNeighbor.set(1, distance);

if (neighbors.size() < k) {
neighbors.add(newNeighbor);
} else {
neighbors.poll(); // remove farthest
neighbors.add(newNeighbor);
}
}
}
}
// Should now have a priorityqueue containing a sorted list of neighbors
// create new result tuple and add to result bag
Tuple newPointA = tupleFactory.newTuple(4);
newPointA.set(0, pointA.get(0));
newPointA.set(1, pointA.get(1));
newPointA.set(2, pointA.get(2));
newPointA.set(3, fromQueue(neighbors));
result.add(newPointA);
} else {
result.add(pointA);
}
}
return result;
}

// Ensure sorted by descending
private PriorityQueue< Tuple> toDistanceSortedQueue(int k, DataBag bag) {
PriorityQueue< Tuple> q = new PriorityQueue< Tuple>(k,
new Comparator< Tuple>() {
public int compare(Tuple t1, Tuple t2) {
try {
Double dist1 = (Double)t1.get(1);
Double dist2 = (Double)t2.get(1);
return dist2.compareTo(dist1);
} catch (ExecException e) {
throw new RuntimeException("Error comparing tuples", e);
}
};
});
for (Tuple tuple : bag) q.add(tuple);
return q;
}

private DataBag fromQueue(PriorityQueue< Tuple> q) {
DataBag bag = bagFactory.newDefaultBag();
for (Tuple tuple : q) bag.add(tuple);
return bag;
}

private Double haversineDistance(Double x1, Double y1, Double x2, Double y2) {
double a = Math.pow(Math.sin((x2-x1)/2), 2)
+ Math.cos(x1) * Math.cos(x2) * Math.pow(Math.sin((y2-y1)/2), 2);

return (2 * Math.asin(Math.min(1, Math.sqrt(a))));
}
}



The details of the NearestNeighbors UDF aren't super important and it's mostly pretty clear what's going on. Just know that it operates on a bag of points as input and returns a bag of points as output that has the same schema. This is really important since we're going to be iterating.

Then we're on to the Pig part, hurray! Since Pig doesn't have any built in support for iteration, I chose to use Jruby (because it's awesome) and pig's "PigServer" java class to do all the work. Here's what the jruby runner looks like (it's kind of a lot so don't get scared):


#!/usr/bin/env jruby

require 'java'

#
# You might consider changing this to point to where you have
# pig installed...
#
jar = "/usr/lib/pig/pig-0.8.1-cdh3u1-core.jar"
conf = "/etc/hadoop/conf"

$CLASSPATH << conf
require jar

import org.apache.pig.ExecType
import org.apache.pig.PigServer
import org.apache.pig.FuncSpec

class NearestNeighbors

attr_accessor :points, :k, :min_zl, :runmode

#
# Create a new nearest neighbors instance
# for the given points, k neighbors to find,
# a optional minimum zl (1-21) and optional
# hadoop run mode (local or mapreduce)
#
def initialize points, k, min_zl=20, runmode='mapreduce'
@points = points
@k = k
@min_zl = min_zl
@runmode = runmode
end

#
# Run the nearest neighbors algorithm
#
def run
start_pig_server
register_jars_and_functions
run_algorithm
stop_pig_server
end

#
# Actually runs all the pig queries for
# the algorithm. Stops if all neighbors
# have been found or if min_zl is reached
#
def run_algorithm
start_nearest_neighbors(points, k, 22)
if run_nearest_neighbors(k, 22)
21.downto(min_zl) do |zl|
iterate_nearest_neighbors(k, zl)
break unless run_nearest_neighbors(k,zl)
end
end
end

#
# Registers algorithm initialization queries
#
def start_nearest_neighbors(input, k, zl)
@pig.register_query(PigQueries.load_points(input))
@pig.register_query(PigQueries.generate_initial_quadkeys(zl))
end

#
# Registers algorithm iteration queries
#
def iterate_nearest_neighbors k, zl
@pig.register_query(PigQueries.load_prior_done(zl))
@pig.register_query(PigQueries.load_prior_not_done(zl))
@pig.register_query(PigQueries.union_priors(zl))
@pig.register_query(PigQueries.trim_quadkey(zl))
end

#
# Runs one iteration of the algorithm
#
def run_nearest_neighbors(k, zl)
@pig.register_query(PigQueries.group_by_quadkey(zl))
@pig.register_query(PigQueries.nearest_neighbors(k, zl))
@pig.register_query(PigQueries.split_results(k, zl))

if !@pig.exists_file("done#{zl}")
@pig.store("done#{zl}", "done#{zl}")
not_done = @pig.store("not_done#{zl}", "not_done#{zl}")
not_done.get_results.has_next?
else
true
end
end

#
# Start a new pig server with the specified run mode
#
def start_pig_server
@pig = PigServer.new(runmode)
end

#
# Stop the running pig server
#
def stop_pig_server
@pig.shutdown
end

#
# Register the jar that contains the nearest neighbors
# and quadkeys udfs and define functions for them.
#
def register_jars_and_functions
@pig.register_jar('../../../../udf/target/sounder-1.0-SNAPSHOT.jar')
@pig.register_function('GetQuadkey', FuncSpec.new('sounder.pig.geo.GetQuadkey()'))
@pig.register_function('NearestNeighbors', FuncSpec.new('sounder.pig.geo.nearestneighbors.NearestNeighbors()'))
end

#
# A simple class to contain the pig queries
#
class PigQueries

#
# Load the geonames points. Obviously,
# this should be modified to accept a
# variable schema.
#
def self.load_points geonames
"points = LOAD '#{geonames}' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray
);"
end

#
# Query to generate quadkeys at the specified zoom level
#
def self.generate_initial_quadkeys(zl)
"projected#{zl} = FOREACH points GENERATE GetQuadkey(longitude, latitude, #{zl}) AS quadkey, geonameid, longitude, latitude, {};"
end

#
# Load previous iteration's done points
#
def self.load_prior_done(zl)
"prior_done#{zl+1} = LOAD 'done#{zl+1}/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Load previous iteration's not done points
#
def self.load_prior_not_done(zl)
"prior_not_done#{zl+1} = LOAD 'not_done#{zl+1}/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Union the previous iterations points that are done
# with the points that are not done
#
def self.union_priors zl
"prior_neighbors#{zl+1} = UNION prior_done#{zl+1}, prior_not_done#{zl+1};"
end

#
# Chop off one character of precision from the existing
# quadkey to go one zl down.
#
def self.trim_quadkey zl
"projected#{zl} = FOREACH prior_neighbors#{zl+1} GENERATE
SUBSTRING(quadkey, 0, #{zl}) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;"
end

#
# Group the points by quadkey
#
def self.group_by_quadkey zl
"grouped#{zl} = FOREACH (GROUP projected#{zl} BY quadkey) GENERATE group AS quadkey, projected#{zl}.(geonameid, longitude, latitude, $4) AS points_bag;"
end

#
# Run the nearest neighbors udf on all the points for
# a given quadkey
#
def self.nearest_neighbors(k, zl)
"nearest_neighbors#{zl} = FOREACH grouped#{zl} GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(#{k}l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Split the results into done and not_done relations
# The algorithm is done when 'not_done' contains
# no more tuples.
#
def self.split_results(k, zl)
"SPLIT nearest_neighbors#{zl} INTO done#{zl} IF COUNT(neighbors) >= #{k}l, not_done#{zl} IF COUNT(neighbors) < #{k}l;"
end
end

end

NearestNeighbors.new(ARGV[0], ARGV[1]).run



Call this file "nearest_neighbors.rb". The idea here is that we register some basic pig queries to do the initialization of the algorithm and the iterations. These queries are run over and over until either the "not_done" relation contains no more elements or the minimum zoom level has been reached. Note that a small zoom level means a big partition of space.

Run it!


I think we're finally ready to run it. Let K=5 and the min zoom level (zl) be 10. Then just run:


$: ./nearest_neighbors.rb allCountries.txt 5 10


To kick it off. The output will live in 'done10' (in your home directory on the hdfs) and all the ones that couldn't find their neighbors (poor guys) are left in 'not_done10'. Let's take a look:


$: hadoop fs -cat done10/part* | head
22321231 5874132 -164.73333 67.83333 {(5870713,0.0020072489956274512),(5864687,0.001833343068439346),(5879702,0.0017344751302650937),(5879702,0.0017344751302650937),(5866444,9.775849082653818E-4)}
133223322 2631726 -17.98263 66.52688 {(2631999,8.959503690090641E-4),(2629528,8.491922360779314E-4),(2629528,8.491922360779314E-4),(2630727,3.840018669838177E-4),(2630727,3.840018669838177E-4)}
133223322 2631999 -18.01884 66.56514 {(2631726,8.959503690090641E-4),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5)}
200103201 5874186 -165.39611 64.74333 {(5864454,0.001422335354026523),(5864454,0.001422335354026523),(5867287,0.0013175743301195593),(5867186,0.0010114669397588846),(5867287,0.0013175743301195593)}
200103201 5878614 -165.3 64.76667 {(5874186,0.0017231123142588567),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047)}
200103203 5875461 -165.55111 64.53889 {(5865814,0.0028283599772347947),(5874676,0.0025819291222640857),(5876108,0.001901914079309611),(5869354,0.0016504815389672197),(5869180,0.0025319553109125676)}
200103300 5861248 -164.27639 64.69278 {(5880635,9.627541402483858E-4),(5878642,8.535957131129946E-4),(5878642,8.535957131129946E-4),(5876626,6.598180173900259E-4),(5876626,6.598180173900259E-4)}
200103300 5876626 -164.27111 64.65389 {(5880635,8.246219806226404E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4)}
200233011 5870290 -170.3 57.21667 {(5867100,0.00113848360324883),(7117548,0.0011082333731440464),(7117548,0.0011082333731440464),(5865746,0.001071745830095263),(5865746,0.001071745830095263)}
200233123 5873595 -169.48056 56.57778 {(7275749,0.0010608526185899635),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4)}


Hurray!

Pig Code


Let's just take a look at the pig code that actually got executed.


points = LOAD 'allCountries.txt' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray
);
projected22 = FOREACH points GENERATE GetQuadkey(longitude, latitude, 22) AS quadkey, geonameid, longitude, latitude, {};
grouped22 = FOREACH (GROUP projected22 BY quadkey) GENERATE group AS quadkey, projected22.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors22 = FOREACH grouped22 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors22 INTO done22 IF COUNT(neighbors) >= 5l, not_done22 IF COUNT(neighbors) < 5l;
prior_done22 = LOAD 'done22/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done22 = LOAD 'not_done22/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors22 = UNION prior_done22, prior_not_done22;
projected21 = FOREACH prior_neighbors22 GENERATE
SUBSTRING(quadkey, 0, 21) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped21 = FOREACH (GROUP projected21 BY quadkey) GENERATE group AS quadkey, projected21.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors21 = FOREACH grouped21 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors21 INTO done21 IF COUNT(neighbors) >= 5l, not_done21 IF COUNT(neighbors) < 5l;
prior_done21 = LOAD 'done21/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done21 = LOAD 'not_done21/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors21 = UNION prior_done21, prior_not_done21;
projected20 = FOREACH prior_neighbors21 GENERATE
SUBSTRING(quadkey, 0, 20) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped20 = FOREACH (GROUP projected20 BY quadkey) GENERATE group AS quadkey, projected20.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors20 = FOREACH grouped20 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors20 INTO done20 IF COUNT(neighbors) >= 5l, not_done20 IF COUNT(neighbors) < 5l;
prior_done20 = LOAD 'done20/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done20 = LOAD 'not_done20/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors20 = UNION prior_done20, prior_not_done20;
projected19 = FOREACH prior_neighbors20 GENERATE
SUBSTRING(quadkey, 0, 19) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped19 = FOREACH (GROUP projected19 BY quadkey) GENERATE group AS quadkey, projected19.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors19 = FOREACH grouped19 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors19 INTO done19 IF COUNT(neighbors) >= 5l, not_done19 IF COUNT(neighbors) < 5l;
prior_done19 = LOAD 'done19/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done19 = LOAD 'not_done19/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors19 = UNION prior_done19, prior_not_done19;
projected18 = FOREACH prior_neighbors19 GENERATE
SUBSTRING(quadkey, 0, 18) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped18 = FOREACH (GROUP projected18 BY quadkey) GENERATE group AS quadkey, projected18.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors18 = FOREACH grouped18 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors18 INTO done18 IF COUNT(neighbors) >= 5l, not_done18 IF COUNT(neighbors) < 5l;
prior_done18 = LOAD 'done18/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done18 = LOAD 'not_done18/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors18 = UNION prior_done18, prior_not_done18;
projected17 = FOREACH prior_neighbors18 GENERATE
SUBSTRING(quadkey, 0, 17) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped17 = FOREACH (GROUP projected17 BY quadkey) GENERATE group AS quadkey, projected17.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors17 = FOREACH grouped17 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors17 INTO done17 IF COUNT(neighbors) >= 5l, not_done17 IF COUNT(neighbors) < 5l;
prior_done17 = LOAD 'done17/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done17 = LOAD 'not_done17/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors17 = UNION prior_done17, prior_not_done17;
projected16 = FOREACH prior_neighbors17 GENERATE
SUBSTRING(quadkey, 0, 16) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped16 = FOREACH (GROUP projected16 BY quadkey) GENERATE group AS quadkey, projected16.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors16 = FOREACH grouped16 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors16 INTO done16 IF COUNT(neighbors) >= 5l, not_done16 IF COUNT(neighbors) < 5l;
prior_done16 = LOAD 'done16/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done16 = LOAD 'not_done16/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors16 = UNION prior_done16, prior_not_done16;
projected15 = FOREACH prior_neighbors16 GENERATE
SUBSTRING(quadkey, 0, 15) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped15 = FOREACH (GROUP projected15 BY quadkey) GENERATE group AS quadkey, projected15.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors15 = FOREACH grouped15 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors15 INTO done15 IF COUNT(neighbors) >= 5l, not_done15 IF COUNT(neighbors) < 5l;
prior_done15 = LOAD 'done15/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done15 = LOAD 'not_done15/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors15 = UNION prior_done15, prior_not_done15;
projected14 = FOREACH prior_neighbors15 GENERATE
SUBSTRING(quadkey, 0, 14) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped14 = FOREACH (GROUP projected14 BY quadkey) GENERATE group AS quadkey, projected14.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors14 = FOREACH grouped14 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors14 INTO done14 IF COUNT(neighbors) >= 5l, not_done14 IF COUNT(neighbors) < 5l;
prior_done14 = LOAD 'done14/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done14 = LOAD 'not_done14/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors14 = UNION prior_done14, prior_not_done14;
projected13 = FOREACH prior_neighbors14 GENERATE
SUBSTRING(quadkey, 0, 13) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped13 = FOREACH (GROUP projected13 BY quadkey) GENERATE group AS quadkey, projected13.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors13 = FOREACH grouped13 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors13 INTO done13 IF COUNT(neighbors) >= 5l, not_done13 IF COUNT(neighbors) < 5l;
prior_done13 = LOAD 'done13/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done13 = LOAD 'not_done13/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors13 = UNION prior_done13, prior_not_done13;
projected12 = FOREACH prior_neighbors13 GENERATE
SUBSTRING(quadkey, 0, 12) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped12 = FOREACH (GROUP projected12 BY quadkey) GENERATE group AS quadkey, projected12.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors12 = FOREACH grouped12 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors12 INTO done12 IF COUNT(neighbors) >= 5l, not_done12 IF COUNT(neighbors) < 5l;
prior_done12 = LOAD 'done12/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done12 = LOAD 'not_done12/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors12 = UNION prior_done12, prior_not_done12;
projected11 = FOREACH prior_neighbors12 GENERATE
SUBSTRING(quadkey, 0, 11) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped11 = FOREACH (GROUP projected11 BY quadkey) GENERATE group AS quadkey, projected11.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors11 = FOREACH grouped11 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors11 INTO done11 IF COUNT(neighbors) >= 5l, not_done11 IF COUNT(neighbors) < 5l;
prior_done11 = LOAD 'done11/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done11 = LOAD 'not_done11/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors11 = UNION prior_done11, prior_not_done11;
projected10 = FOREACH prior_neighbors11 GENERATE
SUBSTRING(quadkey, 0, 10) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped10 = FOREACH (GROUP projected10 BY quadkey) GENERATE group AS quadkey, projected10.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors10 = FOREACH grouped10 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors10 INTO done10 IF COUNT(neighbors) >= 5l, not_done10 IF COUNT(neighbors) < 5l;


So you can see now why, with iteration, it's a good idea to generate as much code as possible.

And we're done :)

All the code for this is on github in the Sounder repo (along with tons of other Apache Pig examples).