mathjax

Thursday, November 14, 2013

Linear Regression With Pig

It's a bit silly trying to motivate a discussion of linear regression. It's everywhere. Linear regression is typically the first pass step for understanding a dataset. Is there a linear relationship between my variables? Maybe. Let's try linear regression! In other, and less precise, words we just fit a damn line to it. Suffice it to say linear regression is one of those tools for data analysis that's not optional. You must know it.

In fact, I'm going to assume just that. That you've used linear regression in other contexts before and understand its utility. The issue at hand is how to parallelize linear regression. Why? Well, suppose you have billions of feature vectors in your data set, each with thousands of features (columns), and you want to use all of them because, why not? Suppose it doesn't fit on one machine. Now, there exists a project to address this specifically, vowpal wabbit, which you should most certainly check out, but that I'm not going to talk about. Instead, the idea is to use Apache Pig. The reason for implementing it with Pig, rather than using an existing tool, is mostly for illustration. Linear regression with pig brings up several design and implementation details that I believe you'll face when doing almost any reasonably useful machine learning at scale. In other words, how do I wire all this shit together?

Specifically, I'll address pig macros, python drivers, and using a whole relation as a scalar. Fun stuff.

linear regression

It's important that I do at least explain a bit of terminology so we're all together in this. So, rather than jump for the most general explanation immediately (why do people do that?) let's talk about something real. Suppose you've measured the current running through a circuit while slowly decreasing the resistance. You would expect the current to increase linearly as you decrease the current (ohms law). In other words, 
\begin{eqnarray*}I=\frac{V}{R}\end{eqnarray*}

To verify Ohm's Law (not a bad idea, I mean, maybe you're living in a dream world where physics is different and you want to know for certain...) you'd record the current \(I\) and the resistance \(R\) at every measurement while holding the voltage \(V\) constant. You'd then fit a line to the data or, more specifically, to \(\frac{1}{R}\), and, if all went well, find that the slope of said line was equal to the voltage.

In the machine learning nomenclature the current would be called the response or target variable. All the responses together form a vector \(y\) called the response vector. The resistance would be called the feature or observation. And, if you recorded more than just the resistance, say, the temperature, then for every response you'd have a collection of features or a feature vector. All the feature vectors together form a matrix \(X\). The goal of linear regression is to find the best set of weights \(w\) that, when used to form a linear combination of the features, creates a vector that is as close as possible to the response vector. So the problem can be phrased as an optimization problem. Here the function we'll minimize is the mean squared error (the square of the distance between the weighted features and the response). Mathematically the squared error, for one feature vector \(x_{i}\) of length \(M\), can be written as:
\begin{eqnarray*}error^2=(y_{i}-\sum_{j=1}^Mw_{j}x_{i,j})^2\end{eqnarray*}
where \(x_{i,0}=1\) by definition.
So the mean squared error (mse), when we've got \(N\) measurements (feature vectors), is then:
\begin{eqnarray*}mse(w)=\frac{1}{N}\sum_{i=1}^N(y_{i}-\sum_{j=1}^Mw_{j}x_{i,j})^2\end{eqnarray*}
So now the question is, exactly how are we going to minimize the mse by varying the weights? Well, it turns out there's the method called gradient descent. That is, the mse decreases fastest if we start with a given set of weights and travel in the direction of the negative gradient of the mse for those weights. In other words:
\begin{eqnarray*}w_{new}=w-\alpha\nabla{mse(w)}\end{eqnarray*}
Where \(\alpha\) is the magnitude of the step size. What this gives us is a way to update the weights until \(w_{new}\) doesn't really change much. Once the weights converge we're done.

algorithm

Alright, now that we've got a rule for updating weights, we can write down the algorithm.

1. Initialize the weights, one per feature, randomly
2. Update the weights by subtracting the gradient of the mse
3. Repeat 2 until converged

implementation

Ok great. Let's get the pieces together.

pig

Pig is going to do most of the real work. There's two main steps involved. The first step, and one that varies strongly from domain to domain, problem to problem, is loading your data and transforming it into something that a generic algorithm can handle. The second, and more interesting, is the implementation of gradient descent of the mse itself.

Here's the implementation of the gradient descent portion. I'll go over each relation in detail.

Go ahead and save that in a directory called 'macros'. Since gradient descent of the mean squared error for the purposes of creating a linear model is a generic problem, it makes sense to implement it as a pig macro.

In lines 19-25 we're attaching the weights to every feature vector. The Zip UDF, which can be found on github, receives the weights as a tuple and the feature vector as a tuple. The output is a bag of new tuples which contains (weight,feature,dimension). Think of Zip like a zipper where it matches the weights to their corresponding features. Importantly, the dimension (index in the input tuples) is returned as well.

Something to notice about this first bit is the scalar cast. Zip receives the entire weights relation as a single object. Since the weights relation is only a single tuple anyway, this is great. This is a good thing. It prevents us from doing something silly like a replicated join on a constant (which works but clutters the logic) or, worse, a cross.

Next, in lines 32-39, we're computing a portion of the gradient of the mse. The reason for the Zip udf in the first step was so the nested projection and sum to compute the dot product of the weights with the features works out cleanly.

Then, on line 42, the full gradient of the mse is computed by multiplying each feature by the error. It might not seem obvious when written like that, but the partial derivatives of the mse with respect to each weight make it work out like this. How nice.

Lines 47-54 is where the action happens. By action I mean we'll actually trigger a reduce job since everything up to this point has been map only. This is where the partial derivative bits come together. That is, we're grouping by dimension and weight (which, it turns out is precisely the thing we're differentiating with respect to) and summing each feature vector's contribution to the partial derivative along that dimension. The factor bit, on line 48, is the multiplier for the gradient. It includes the normalization term (we normalize by the number of features since we're differentiating the mean squared error) and the step size. The result of this step is a new weight for each dimension.

An important thing to note here is that there are only a number of partitions equivalent to the number of features. What this means is that each reduce task would get a potentially very large amount of data. Fortunately for us COUNT and SUM are both algebraic and so Pig will use combiners, hurray!, drastically reducing the amount of data sent to each reduce task.

Finally, on lines 60-65, we reconstruct a new tuple with the new weights and return it. The schema of this tuple should be the same as the input weight tuple.

gory details

So now that we have a way to do it, we should do it. Right, I mean if we can we should. Isn't that how technology works...

I'll going to go ahead and do a completely contrived example. The reason is so that I can visualize the results.

I've created some data, called data.tsv, which satisfies the following:
\begin{eqnarray*}y=0.3r(x) + 2x\end{eqnarray*}
where \(r(x)\) is a noise term. And here's the plot:


So we have two feature columns, (1.0, \(x\)), that we're trying to find the weights for. Since we've cooked this example (we already know the relationship between \(x\) and \(y\)) we expect the weights for those columns to be (0.0,2.0) if all goes well.

Now that we've got some data to work with, we'll need to write a bit more pig to load the data up and run it through our gradient descent macro.


There's really not much exciting going on here. We're loading the features and weights and rearranging them to satisfy the schema that the gradient descent macro expects. The other details are related to the driver script.

The driver script is next. Since our algorithm is iterative and pig itself has no support for iteration, we're going to embed the pig script into a python (jython) program. Here's what that looks like:

There's a lot going on here (it's cluttered looking because Pig doesn't allow class or function definitions in driver scripts) that's not that interesting and pretty easy to understand so I'll just go over the high points:
  • We initialize the weights randomly and write a .pig_schema file as well. The reason for writing the schema file is so that it's unnecessary to write the redundant schema for weights in the pig script itself.
  • We want the driver to be agnostic to whether we're running in local mode or mapreduce mode. Thus we copy the weights to the filesystem using the copyFromLocal fs command. In local mode this just puts the initial weights on the local fs whereas in mapreduce mode this'll place them on the hdfs.
  • Then we iterate until the convergence criteria is met. Each iteration details like copying the schema, and pulling down the weights to compute distances is done.
  • A moving average is maintained over the past 25 weights. Iteration stops when the new weight is less than EPS away from the average.
Aside from that there's an interesting -bug?- that comes up as a result of this. Notice on line 70 how the new variables are bound each iteration? Well the underlying PigContext object just keeps adding these new variables instead of overwriting them. What this means is that, after a couple thousand iterations, depending on your PIG_HEAPSIZE env variable, the driver script will crash from an out of memory error. Yikes.

run it

Finally we get to run it. That part's easy:

$: export PIG_HEAPSIZE=4000
$: pig driver.py fit_line.pig data data.tsv 2
  
That is, we use the pig command to launch the driver program. The arguments to the driver script itself follow. We're running the fit_line.pig script where the data dir (where intermediate weights will go) exists under 'data' and the input data, data.tsv, should exist there as well. The '2' indicates we've got two weights (w0, and w1). The pig heapsize is to deal with the bug mentioned in the previous section.

On my laptop, in local mode, the convergence criteria was met after 1527 iterations.

results


After 1527 iterations the weights ended up as (0.0040666759502969215, 2.0068029003414014) which is exactly what we'd expect. In other words:
\begin{eqnarray*}y=0.0041 + 2.0068x\end{eqnarray*}
which is the 'best fit' line to our original:
\begin{eqnarray*}y=0.3r(x) + 2x\end{eqnarray*}

And here's an illustration of what happened.


Looks pretty reasonable to me. Hurray. Now go fit some real data.

Monday, September 9, 2013

Get Pig LogicalPlan

Recently I've been wanting to get ahold of the logical plan (a graph representation) for a pig script without running it. The largest reason is that the logical plan is a fairly language and platform agnostic representation of a dataflow. Once you have the logical plan I can think of several fun things you could do with it:


  • Serialize it as JSON and send it to any number of arbitrary tools
  • Visualize it in a web browser
  • Edit it with a web app
  • Compile it into an execution (physical) plan for arbitrary (non-hadoop map-reduce) backend frameworks that make sense (storm, s4, spark) 
Ok, so maybe those are the fun things I actually plan on doing with it, but what's the difference?

Problem

Pig doesn't make it easy to get this. After spending several hours digging through the way pig parses and runs a pig script I've come away somewhat shaken up. The parsing logic is deeply coupled with the execution logic. Yes, yes, this is supposed to change as we go forward, eg PIG-3419, but what about in the mean time?

Hack/Solution

So, I've written this little jruby script to return the LogicalPlan for a pig script. Right now all it does is exactly the same as putting an 'EXPLAIN' operator in your script. However, since it exposes the LogicalPlan, you could easily extend this to do whatever you like with it.


Monday, August 12, 2013

Using Hadoop to Explore Chaos

Hadoop. Hadoop has managed to insinuate itself into practically every company with an engineering team and some data. If your company isn't using it, you know a company that is. Hell, it's why you're reading this to begin with. That being said, what you're probably doing with Hadoop is boring and uninspired. It's not your fault of course. Pretty much every example out there pigeonholes Hadoop into default business use cases like etl and data cleaning, basic statistics, machine learning, and GIS.

You know what though? Sometimes it's good to explore things that don't have an obvious business use case. Things that are weird. Things that are pretty. Things that are ridiculous. Things like dynamical systems and chaos. And, if you happen to find there are applicable tidbits along the way (*hint, skip to the problem outline section*), great, otherwise just enjoy the diversion.

motivation


So what is a dynamical system? Dryly, a dynamical system is a fixed rule to describe how a point moves through geometric space over time. Pretty much everything that is interesting can be modeled as a dynamical system. Population, traffic flows, fireflies, and neurons can all be describe this way.

In most cases, you'll have a system of ordinary differential equations like this:

\begin{eqnarray*} \dot{x_{1}} & = & f_{1}(x_{1},\ldots,x_{n})\\ \vdots\\ \dot{x_{n}} & = & f_{n}(x_{1},\ldots,x_{n}) \end{eqnarray*}

For example, the Fitzhugh-Nagumo model (which models a biological neuron being zapped by an external current):

\begin{eqnarray*} \dot{v} & = & v-\frac{v^{3}}{3}-w+I_{{\rm ext}}\\ \dot{w} & = & 0.08(v+0.7-0.8w) \end{eqnarray*}

In this case \(v\) represents the potential difference between the inside of the neuron and the outside (membrane potential), and \(w\) corresponds to how the neuron recovers after it fires. There's also an external current \(I_{{\rm ext}}\) which can model other neurons zapping the one we're looking at but could just as easily be any other source of current like a car battery. The numerical constants in the system are experimentally derived from looking at how giant squid axons behave. Basically, these guys in the 60's were zapping giant squid brains for science. Understand a bit more why I think your business use case is boring?

One of the simple ways you can study a dynamical system is to see how it behaves for a wide variety of parameter values. In the Fitzhugh-Nagumo case the only real parameter is the external current \(I_{{\rm ext}}\). For example, for what values of \(I_{{\rm ext}}\) does the system behave normally? For what values does it fire like crazy? Can I zap it so much that it stops firing altogether?

In order to do that you'd just decide on some reasonable range of currents, say \((0,1)\), break that range into some number of points, and simulate the system while changing the value of \(I_{{\rm ext}}\) each time.

chaos


There's a a lot of great ways to summarize the behavior of a dynamical system if you can simulate its trajectories. Simulated trajectories are, after all, just data sets. The way I'm going to focus on is calculation of the largest lyapunov exponent. Basically, all the lyapunov exponent says is, if I take two identical systems and start them going at slightly different places, how similarly do they behave? 

For example, If I hook a car battery to two identical squid neurons at the same time, but one has a little bit of extra charge on it, does their firing stay in sync forever or do they start to diverge in time? The lyapunov exponent would measure the rate at which they diverge. If the two neurons fire close in time but don't totally sync up then the lyapunov exponent would be zero. If they eventually start firing at the same time then the lyapunov exponent is negative (they're not diverging, they're coming together). Finally, if they continually diverge from one another then the lyapunov exponent is positive.

As it turns out, a positive lyapunov exponent usually means the system is chaotic. No matter how close two points start out, they will diverge exponentially. What this means in practice is that, while I might have a predictive model (as a dynamical system) of something really cool like a hurricane, I simply can't measure it precisely enough to make a good prediction of where it's going to go. A really small measurement error, between where the hurricane actually is and where I measure it to be, will diverge exponentially. So my model will predict the hurricane heading into Texas when it actually heads into Louisanna. Yep. Chaos indeed.

problem outline


So I'm going to compute the lyapunov exponent of a dynamical system for some range of parameter values. The system I'm going to use is the Henon Map:
\begin{eqnarray*}x_{n+1} & = & y_{n}+1-ax_{n}^{2}\\y_{n+1} & = & bx_{n}\end{eqnarray*}
I choose the Henon map for a few reasons despite the fact that it isn't modeling a physical system. One, it's super simple and doesn't involve time at all. Two, it's two dimensional so it's easy to plot it and take a look at it. Finally, it's only got two parameters meaning the range of parameter values will make up a plane (and not some n-dimensional hyperspace) so I can make a pretty picture.

What does Hadoop have to do with all this anyway? Well, I've got to break the parameter plane (ab-plane) into a set of coordinates and run one simulation per coordinate. Say I let \(a=[a_{min},a_{max}]\) and \(b=[b_{min},b_{max}]\) and I want to look \(N\) unique \(a\) values and \(M\) unique \(b\) values. That means I have to run \(N \times M\) individual simulations!

Clearly, the situation gets even worse if I have more parameters (a.k.a a realistic system). However, since each simulation is independent of all the other simulations, I can benefit dramatically from simple parallelization. And that, my friends, is what Hadoop does best. It makes parallelization trivially simple. It handles all those nasty details (which distract from the actual problem at hand) like what machine gets what tasks, what to do about failed tasks, reporting, logging, and the whole bit.

So here's the rough idea:


  1. Use Hadoop to split the n-dimensional (2D for this trivial example) space into several tiles that will be processed in parallel
  2. Each split of the space is just a set of parameter values. Use these parameter values to run a simulation.
  3. Calculate the lyapunov exponent resulting from each.
  4. Slice the results, visualize, and analyze further (perhaps at higher resolution on a smaller region of parameter space), to understand under what conditions the system is chaotic. In the simple Henon map case I'll make a 2D image to look at.
The important silly detail is this. The input data here is minuscule in comparison to most data sets handled with Hadoop. This is NOT big data. Instead, the input data is a small file with n lines and can be thought of as a "spatial specification". It is the input format that explodes the spatial specification into the many individual tiles needed. In other words, Hadoop is not just for big data, it can be used for massively parallel scientific computing.
  

implementation


Hadoop has been around for a while now. So when I implement something with Hadoop you can be sure I'm not going to sit down and write a java map-reduce program. Instead, I'll use Pig and custom functions for pig to hijack the Hadoop input format functionality. Expanding the rough idea in the outline above:

  1. Pig will load a spatial specification file that defines the extent of the space to explore and with what granularity to explore it.
  2. A custom Pig LoadFunc will use the specification to create individual input splits for each tile of the space to explore. For less parallelism than one input split per tile it's possible to specify the number of total splits. In this case the tiles will be split mostly evenly among the input splits.
  3. The LoadFunc overrides Hadoop classes. Specifically: InputFormat (which does the work of expanding the space), InputSplit (which represents the set of one or more spatial tiles), and RecordReader (for deserializing the splits into useful tiles).
  4. A custom EvalFunc will take the tuple representing a tile from the LoadFunc and use its values as parameters in simulating the system and computing the lyapunov exponent. The lyapunov exponent is the result.
And here is the pig script:

define LyapunovForHenon sounder.pig.chaos.LyapunovForHenon();

points = load 'data/space_spec' using sounder.pig.points.RectangularSpaceLoader();

exponents = foreach points generate $0 as a, $1 as b, LyapunovForHenon($0, $1);         
 
store exponents into 'data/result';

You can take a look at the detailed implementations of each component on github. See: LyapunovForHenon, RectangularSpaceLoader

running


I want to explore the Henon map over a range where it's likely to be bounded (unbounded solutions aren't that interesting) and chaotic. Here's my input file:

$: cat data/space_spec
0.6,1.6,800
-1.0,1.0,800

Remember the system?
\begin{eqnarray*}x_{n+1} & = & y_{n}+1-ax_{n}^{2}\\y_{n+1} & = & bx_{n}\end{eqnarray*} Well, the spatial specification says (if I let the first line represent \(a\) and the second be \(b\)) that I'm looking at an \(800 \times 800\) (or 640000 independent simulations) grid in the ab-plane where \(a=[0.6,1.6]\) and \(b=[-1.0,1.0]\)

Now, these bounds aren't arbitrary. The Henon attractor that most are familiar with (if you're familiar with chaos and strange attractors in the least bit) occurs when \(a=1.4\) and \(b=0.3\). I want to ensure I'm at least going over that case.

result


With that, I just need to run it and visualize the results.


$: cat data/result/part-m* | head
0.6	-1.0	9.132244649409043E-5
0.6	-0.9974968710888611	-0.0012539625419929572
0.6	-0.9949937421777222	-0.0025074937591903013
0.6	-0.9924906132665833	-0.0037665150764570965
0.6	-0.9899874843554444	-0.005032402237514987
0.6	-0.9874843554443055	-0.006299127065420516
0.6	-0.9849812265331666	-0.007566751054452304
0.6	-0.9824780976220276	-0.008838119048229768
0.6	-0.9799749687108887	-0.010113503950504331
0.6	-0.9774718397997498	-0.011392710785045064
$: cat data/result/part-m* > data/henon-lyapunov-ab-plane.tsv

To visualize I used this simple python script to get:


The big swaths of flat white are regions where the system becomes unbounded. It's interesting that the bottom right portion has some structure to it that's possibly fractal. The top right portion, between \(b=0.0\) and \(b=0.5\) and \(a=1.0\) to \(a=1.6\) is really the only region on this image that's chaotic (where the exponent is non-negative and greater than zero). There's a lot more structure here to look at but I'll leave that to you. As a followup it'd be cool to zoom in on the bottom right corner and run this again.

conclusion


So yes, it's possible to use Hadoop to do massively parallel scientific computing and avoid the question of big data entirely. Best of all it's easy. 

The notion of exploding a space and doing something with each tile in parallel is actually pretty general and, as I've shown, super easy to do with Hadoop. I'll leave it to you to come up with your own way of applying it.