## Wednesday, October 26, 2011

### Nearest Neighbors With Apache Pig and Jruby

Since it's been such a long time since I last posted I thought I'd make this one a bit longer. It really is a condensing of a lot of things I've been working with and thinking about over the past few months.

## Nearest Neighbors

The nearest neighbors problem (also known as the post-office problem) is this: Given a point X in some metric space M, assign to it the nearest neighboring point S. In other words, given a residence, assign to it the nearest post office. The K-nearest neighbors problem, which this post addresses, is just a slight generalization of that problem. Instead of just one neighbor we are looking for K neighbors.

## Problem

So, we're going to use the geonames data. This is a set of nearly 8 million geo points with names, coordinates, and a bunch of other good stuff, from around the world. We would like to find, for a given point in the geonames set, the 5 nearest points (also in geonames) that are nearest to it. Should be pretty simple yeah?

### Get data

The geonames data set 'allCountries.zip' can be downloaded like so:

$: hadoop fs -put allCountries.txt . to unzip the package and place the tsv file into your home directory on the hadoop distributed file system. ### Schema Oh, and by the way, before we forget, the data from geonames has this pig schema: geonameid: int, name: chararray, asciiname: chararray, alternatenames: chararray, latitude: double, longitude: double, feature_class: chararray, feature_code: chararray, country_code: chararray, cc2: chararray, admin1_code: chararray, admin2_code: chararray, admin3_code: chararray, admin4_code: chararray, population: long, elevation: int, gtopo30: int, timezone: chararray, modification_date: chararray ### The Algorithm Now that we have the data we can start to play with it and think about how to solve the problem at hand. Looking at the data (use something like 'head', 'cat', 'cut', etc) we see that there are really only three fields of interest in the data: (geonameid, longitude, and latitude). All the other fields are just nice metadata which we can attach later. Now, since we're going to be using Apache Pig to solve this problem we need to think a little bit about parallelism. One constraint is that at no time is any one point going to have access to the locations of all the other points. In other words, we will not be storing the full set of points in memory. Besides, it's 8 million points, that's kind of a lot for my poor little machine to handle. So it's clear (right?) that we're going to have to partition the space in some way. Then, within a partition of the space, we'll need to apply a local version of the nearest neighbors algorithm. That's it really. Map and reduce. Wait, but there's one problem. What happens if we don't find all 5 neighbors for a point in a single partition? Hmmm. Well, the answer is iteration. We'll choose a small partition size to begin with and gradually increase the partition size until either the partition size is too large or all the neighbors have been found. Got it? Recap: • (1) Partition the space • (2) Search for nearest neighbors in a single partition • (3) If all neighbors have been found, terminate; else increase partition size and repeat (1) and (2) ### Implementation For partitioning the space we're going to use Google quadkeys (http://msdn.microsoft.com/en-us/library/bb259689.aspx) since it's super easy to implement and it partitions the space nicely. This will be a java UDF for Pig that takes a (longitude, latitude, and zoom level) tuple and returns a string quadkey (the partition id). Here's the actual java code for that. Let's call it "GetQuadkey": package sounder.pig.geo; import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; /** See: http://msdn.microsoft.com/en-us/library/bb259689.aspx A Pig UDF to compute the quadkey string for a given (longitude, latitude, resolution) tuple. */ public class GetQuadkey extends EvalFunc< String> { private static final int TILE_SIZE = 256; public String exec(Tuple input) throws IOException { if (input == null || input.size() < 3 || input.isNull(0) || input.isNull(1) || input.isNull(2)) return null; Double longitude = (Double)input.get(0); Double latitude = (Double)input.get(1); Integer resolution = (Integer)input.get(2); String quadKey = quadKey(longitude, latitude, resolution); return quadKey; } private static String quadKey(double longitude, double latitude, int resolution) { int[] pixels = pointToPixels(longitude, latitude, resolution); int[] tiles = pixelsToTiles(pixels[0], pixels[1]); return tilesToQuadKey(tiles[0], tiles[1], resolution); } /** Return the pixel X and Y coordinates for the given lat, lng, and resolution. */ private static int[] pointToPixels(double longitude, double latitude, int resolution) { double x = (longitude + 180) / 360; double sinLatitude = Math.sin(latitude * Math.PI / 180); double y = 0.5 - Math.log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * Math.PI); int mapSize = mapSize(resolution); int[] pixels = {(int) trim(x * mapSize + 0.5, 0, mapSize - 1), (int) trim(y * mapSize + 0.5, 0, mapSize - 1)}; return pixels; } /** Convert from pixel coordinates to tile coordinates. */ private static int[] pixelsToTiles(int pixelX, int pixelY) { int[] tiles = {pixelX / TILE_SIZE, pixelY / TILE_SIZE}; return tiles; } /** Finally, given tile coordinates and a resolution, returns the appropriate quadkey */ private static String tilesToQuadKey(int tileX, int tileY, int resolution) { StringBuilder quadKey = new StringBuilder(); for (int i = resolution; i > 0; i--) { char digit = '0'; int mask = 1 << (i - 1); if ((tileX & mask) != 0) { digit++; } if ((tileY & mask) != 0) { digit++; digit++; } quadKey.append(digit); } return quadKey.toString(); } /** Ensure input value is within minval and maxval */ private static double trim(double n, double minVal, double maxVal) { return Math.min(Math.max(n, minVal), maxVal); } /** Width of the map, in pixels, at the given resolution */ public static int mapSize(int resolution) { return TILE_SIZE << resolution; } } Next we need to have another java udf that operates on all the points in a partition. Let's call it "NearestNeighbors". Here's a naive implementation of that: package sounder.pig.geo.nearestneighbors; import java.io.IOException; import java.util.PriorityQueue; import java.util.Iterator; import java.util.Comparator; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; public class NearestNeighbors extends EvalFunc< DataBag> { private static TupleFactory tupleFactory = TupleFactory.getInstance(); private static BagFactory bagFactory = BagFactory.getInstance(); public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1)) return null; Long k = (Long)input.get(0); DataBag points = (DataBag)input.get(1); // {(id,lng,lat,{(n1,n1_dist)...})} DataBag result = bagFactory.newDefaultBag(); for (Tuple pointA : points) { DataBag neighborsBag = (DataBag)pointA.get(3); if (neighborsBag.size() < k) { PriorityQueue< Tuple> neighbors = toDistanceSortedQueue(k.intValue(), neighborsBag); Double x1 = Math.toRadians((Double)pointA.get(1)); Double y1 = Math.toRadians((Double)pointA.get(2)); for (Tuple pointB : points) { if (pointA!=pointB) { Double x2 = Math.toRadians((Double)pointB.get(1)); Double y2 = Math.toRadians((Double)pointB.get(2)); Double distance = haversineDistance(x1,y1,x2,y2); // Add this point as a neighbor if pointA has no neighbors if (neighbors.size()==0) { Tuple newNeighbor = tupleFactory.newTuple(2); newNeighbor.set(0, pointB.get(0)); newNeighbor.set(1, distance); neighbors.add(newNeighbor); } Tuple furthestNeighbor = neighbors.peek(); Double neighborDist = (Double)furthestNeighbor.get(1); if (distance < neighborDist) { Tuple newNeighbor = tupleFactory.newTuple(2); newNeighbor.set(0, pointB.get(0)); newNeighbor.set(1, distance); if (neighbors.size() < k) { neighbors.add(newNeighbor); } else { neighbors.poll(); // remove farthest neighbors.add(newNeighbor); } } } } // Should now have a priorityqueue containing a sorted list of neighbors // create new result tuple and add to result bag Tuple newPointA = tupleFactory.newTuple(4); newPointA.set(0, pointA.get(0)); newPointA.set(1, pointA.get(1)); newPointA.set(2, pointA.get(2)); newPointA.set(3, fromQueue(neighbors)); result.add(newPointA); } else { result.add(pointA); } } return result; } // Ensure sorted by descending private PriorityQueue< Tuple> toDistanceSortedQueue(int k, DataBag bag) { PriorityQueue< Tuple> q = new PriorityQueue< Tuple>(k, new Comparator< Tuple>() { public int compare(Tuple t1, Tuple t2) { try { Double dist1 = (Double)t1.get(1); Double dist2 = (Double)t2.get(1); return dist2.compareTo(dist1); } catch (ExecException e) { throw new RuntimeException("Error comparing tuples", e); } }; }); for (Tuple tuple : bag) q.add(tuple); return q; } private DataBag fromQueue(PriorityQueue< Tuple> q) { DataBag bag = bagFactory.newDefaultBag(); for (Tuple tuple : q) bag.add(tuple); return bag; } private Double haversineDistance(Double x1, Double y1, Double x2, Double y2) { double a = Math.pow(Math.sin((x2-x1)/2), 2) + Math.cos(x1) * Math.cos(x2) * Math.pow(Math.sin((y2-y1)/2), 2); return (2 * Math.asin(Math.min(1, Math.sqrt(a)))); } } The details of the NearestNeighbors UDF aren't super important and it's mostly pretty clear what's going on. Just know that it operates on a bag of points as input and returns a bag of points as output that has the same schema. This is really important since we're going to be iterating. Then we're on to the Pig part, hurray! Since Pig doesn't have any built in support for iteration, I chose to use Jruby (because it's awesome) and pig's "PigServer" java class to do all the work. Here's what the jruby runner looks like (it's kind of a lot so don't get scared): #!/usr/bin/env jruby require 'java' # # You might consider changing this to point to where you have # pig installed... # jar = "/usr/lib/pig/pig-0.8.1-cdh3u1-core.jar" conf = "/etc/hadoop/conf"$CLASSPATH << conf
require jar

import org.apache.pig.ExecType
import org.apache.pig.PigServer
import org.apache.pig.FuncSpec

class NearestNeighbors

attr_accessor :points, :k, :min_zl, :runmode

#
# Create a new nearest neighbors instance
# for the given points, k neighbors to find,
# a optional minimum zl (1-21) and optional
# hadoop run mode (local or mapreduce)
#
def initialize points, k, min_zl=20, runmode='mapreduce'
@points = points
@k = k
@min_zl = min_zl
@runmode = runmode
end

#
# Run the nearest neighbors algorithm
#
def run
start_pig_server
register_jars_and_functions
run_algorithm
stop_pig_server
end

#
# Actually runs all the pig queries for
# the algorithm. Stops if all neighbors
# have been found or if min_zl is reached
#
def run_algorithm
start_nearest_neighbors(points, k, 22)
if run_nearest_neighbors(k, 22)
21.downto(min_zl) do |zl|
iterate_nearest_neighbors(k, zl)
break unless run_nearest_neighbors(k,zl)
end
end
end

#
# Registers algorithm initialization queries
#
def start_nearest_neighbors(input, k, zl)
end

#
# Registers algorithm iteration queries
#
def iterate_nearest_neighbors k, zl
@pig.register_query(PigQueries.union_priors(zl))
end

#
# Runs one iteration of the algorithm
#
def run_nearest_neighbors(k, zl)
@pig.register_query(PigQueries.nearest_neighbors(k, zl))
@pig.register_query(PigQueries.split_results(k, zl))

if !@pig.exists_file("done#{zl}")
@pig.store("done#{zl}", "done#{zl}")
not_done = @pig.store("not_done#{zl}", "not_done#{zl}")
not_done.get_results.has_next?
else
true
end
end

#
# Start a new pig server with the specified run mode
#
def start_pig_server
@pig = PigServer.new(runmode)
end

#
# Stop the running pig server
#
def stop_pig_server
@pig.shutdown
end

#
# Register the jar that contains the nearest neighbors
# and quadkeys udfs and define functions for them.
#
def register_jars_and_functions
@pig.register_jar('../../../../udf/target/sounder-1.0-SNAPSHOT.jar')
@pig.register_function('NearestNeighbors', FuncSpec.new('sounder.pig.geo.nearestneighbors.NearestNeighbors()'))
end

#
# A simple class to contain the pig queries
#
class PigQueries

#
# Load the geonames points. Obviously,
# this should be modified to accept a
# variable schema.
#
"points = LOAD '#{geonames}' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray
);"
end

#
# Query to generate quadkeys at the specified zoom level
#
"projected#{zl} = FOREACH points GENERATE GetQuadkey(longitude, latitude, #{zl}) AS quadkey, geonameid, longitude, latitude, {};"
end

#
# Load previous iteration's done points
#
"prior_done#{zl+1} = LOAD 'done#{zl+1}/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Load previous iteration's not done points
#
"prior_not_done#{zl+1} = LOAD 'not_done#{zl+1}/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Union the previous iterations points that are done
# with the points that are not done
#
def self.union_priors zl
"prior_neighbors#{zl+1} = UNION prior_done#{zl+1}, prior_not_done#{zl+1};"
end

#
# Chop off one character of precision from the existing
# quadkey to go one zl down.
#
"projected#{zl} = FOREACH prior_neighbors#{zl+1} GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;"
end

#
# Group the points by quadkey
#
"grouped#{zl} = FOREACH (GROUP projected#{zl} BY quadkey) GENERATE group AS quadkey, projected#{zl}.(geonameid, longitude, latitude, $4) AS points_bag;" end # # Run the nearest neighbors udf on all the points for # a given quadkey # def self.nearest_neighbors(k, zl) "nearest_neighbors#{zl} = FOREACH grouped#{zl} GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(#{k}l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} );" end # # Split the results into done and not_done relations # The algorithm is done when 'not_done' contains # no more tuples. # def self.split_results(k, zl) "SPLIT nearest_neighbors#{zl} INTO done#{zl} IF COUNT(neighbors) >= #{k}l, not_done#{zl} IF COUNT(neighbors) < #{k}l;" end end end NearestNeighbors.new(ARGV[0], ARGV[1]).run Call this file "nearest_neighbors.rb". The idea here is that we register some basic pig queries to do the initialization of the algorithm and the iterations. These queries are run over and over until either the "not_done" relation contains no more elements or the minimum zoom level has been reached. Note that a small zoom level means a big partition of space. ## Run it! I think we're finally ready to run it. Let K=5 and the min zoom level (zl) be 10. Then just run:$: ./nearest_neighbors.rb allCountries.txt 5 10

To kick it off. The output will live in 'done10' (in your home directory on the hdfs) and all the ones that couldn't find their neighbors (poor guys) are left in 'not_done10'. Let's take a look:

$: hadoop fs -cat done10/part* | head 22321231 5874132 -164.73333 67.83333 {(5870713,0.0020072489956274512),(5864687,0.001833343068439346),(5879702,0.0017344751302650937),(5879702,0.0017344751302650937),(5866444,9.775849082653818E-4)} 133223322 2631726 -17.98263 66.52688 {(2631999,8.959503690090641E-4),(2629528,8.491922360779314E-4),(2629528,8.491922360779314E-4),(2630727,3.840018669838177E-4),(2630727,3.840018669838177E-4)} 133223322 2631999 -18.01884 66.56514 {(2631726,8.959503690090641E-4),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5)} 200103201 5874186 -165.39611 64.74333 {(5864454,0.001422335354026523),(5864454,0.001422335354026523),(5867287,0.0013175743301195593),(5867186,0.0010114669397588846),(5867287,0.0013175743301195593)} 200103201 5878614 -165.3 64.76667 {(5874186,0.0017231123142588567),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047)} 200103203 5875461 -165.55111 64.53889 {(5865814,0.0028283599772347947),(5874676,0.0025819291222640857),(5876108,0.001901914079309611),(5869354,0.0016504815389672197),(5869180,0.0025319553109125676)} 200103300 5861248 -164.27639 64.69278 {(5880635,9.627541402483858E-4),(5878642,8.535957131129946E-4),(5878642,8.535957131129946E-4),(5876626,6.598180173900259E-4),(5876626,6.598180173900259E-4)} 200103300 5876626 -164.27111 64.65389 {(5880635,8.246219806226404E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4)} 200233011 5870290 -170.3 57.21667 {(5867100,0.00113848360324883),(7117548,0.0011082333731440464),(7117548,0.0011082333731440464),(5865746,0.001071745830095263),(5865746,0.001071745830095263)} 200233123 5873595 -169.48056 56.57778 {(7275749,0.0010608526185899635),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4)} Hurray! ### Pig Code Let's just take a look at the pig code that actually got executed. points = LOAD 'allCountries.txt' AS ( geonameid: int, name: chararray, asciiname: chararray, alternatenames: chararray, latitude: double, longitude: double, feature_class: chararray, feature_code: chararray, country_code: chararray, cc2: chararray, admin1_code: chararray, admin2_code: chararray, admin3_code: chararray, admin4_code: chararray, population: long, elevation: int, gtopo30: int, timezone: chararray, modification_date: chararray ); projected22 = FOREACH points GENERATE GetQuadkey(longitude, latitude, 22) AS quadkey, geonameid, longitude, latitude, {}; grouped22 = FOREACH (GROUP projected22 BY quadkey) GENERATE group AS quadkey, projected22.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors22 = FOREACH grouped22 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors22 INTO done22 IF COUNT(neighbors) >= 5l, not_done22 IF COUNT(neighbors) < 5l;
prior_done22 = LOAD 'done22/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done22 = LOAD 'not_done22/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors22 = UNION prior_done22, prior_not_done22;
projected21 = FOREACH prior_neighbors22 GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped21 = FOREACH (GROUP projected21 BY quadkey) GENERATE group AS quadkey, projected21.(geonameid, longitude, latitude, $4) AS points_bag; nearest_neighbors21 = FOREACH grouped21 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); SPLIT nearest_neighbors21 INTO done21 IF COUNT(neighbors) >= 5l, not_done21 IF COUNT(neighbors) < 5l; prior_done21 = LOAD 'done21/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_not_done21 = LOAD 'not_done21/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_neighbors21 = UNION prior_done21, prior_not_done21; projected20 = FOREACH prior_neighbors21 GENERATE SUBSTRING(quadkey, 0, 20) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors; grouped20 = FOREACH (GROUP projected20 BY quadkey) GENERATE group AS quadkey, projected20.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors20 = FOREACH grouped20 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors20 INTO done20 IF COUNT(neighbors) >= 5l, not_done20 IF COUNT(neighbors) < 5l;
prior_done20 = LOAD 'done20/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done20 = LOAD 'not_done20/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors20 = UNION prior_done20, prior_not_done20;
projected19 = FOREACH prior_neighbors20 GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped19 = FOREACH (GROUP projected19 BY quadkey) GENERATE group AS quadkey, projected19.(geonameid, longitude, latitude, $4) AS points_bag; nearest_neighbors19 = FOREACH grouped19 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); SPLIT nearest_neighbors19 INTO done19 IF COUNT(neighbors) >= 5l, not_done19 IF COUNT(neighbors) < 5l; prior_done19 = LOAD 'done19/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_not_done19 = LOAD 'not_done19/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_neighbors19 = UNION prior_done19, prior_not_done19; projected18 = FOREACH prior_neighbors19 GENERATE SUBSTRING(quadkey, 0, 18) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors; grouped18 = FOREACH (GROUP projected18 BY quadkey) GENERATE group AS quadkey, projected18.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors18 = FOREACH grouped18 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors18 INTO done18 IF COUNT(neighbors) >= 5l, not_done18 IF COUNT(neighbors) < 5l;
prior_done18 = LOAD 'done18/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done18 = LOAD 'not_done18/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors18 = UNION prior_done18, prior_not_done18;
projected17 = FOREACH prior_neighbors18 GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped17 = FOREACH (GROUP projected17 BY quadkey) GENERATE group AS quadkey, projected17.(geonameid, longitude, latitude, $4) AS points_bag; nearest_neighbors17 = FOREACH grouped17 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); SPLIT nearest_neighbors17 INTO done17 IF COUNT(neighbors) >= 5l, not_done17 IF COUNT(neighbors) < 5l; prior_done17 = LOAD 'done17/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_not_done17 = LOAD 'not_done17/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_neighbors17 = UNION prior_done17, prior_not_done17; projected16 = FOREACH prior_neighbors17 GENERATE SUBSTRING(quadkey, 0, 16) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors; grouped16 = FOREACH (GROUP projected16 BY quadkey) GENERATE group AS quadkey, projected16.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors16 = FOREACH grouped16 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors16 INTO done16 IF COUNT(neighbors) >= 5l, not_done16 IF COUNT(neighbors) < 5l;
prior_done16 = LOAD 'done16/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done16 = LOAD 'not_done16/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors16 = UNION prior_done16, prior_not_done16;
projected15 = FOREACH prior_neighbors16 GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped15 = FOREACH (GROUP projected15 BY quadkey) GENERATE group AS quadkey, projected15.(geonameid, longitude, latitude, $4) AS points_bag; nearest_neighbors15 = FOREACH grouped15 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); SPLIT nearest_neighbors15 INTO done15 IF COUNT(neighbors) >= 5l, not_done15 IF COUNT(neighbors) < 5l; prior_done15 = LOAD 'done15/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_not_done15 = LOAD 'not_done15/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_neighbors15 = UNION prior_done15, prior_not_done15; projected14 = FOREACH prior_neighbors15 GENERATE SUBSTRING(quadkey, 0, 14) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors; grouped14 = FOREACH (GROUP projected14 BY quadkey) GENERATE group AS quadkey, projected14.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors14 = FOREACH grouped14 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors14 INTO done14 IF COUNT(neighbors) >= 5l, not_done14 IF COUNT(neighbors) < 5l;
prior_done14 = LOAD 'done14/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done14 = LOAD 'not_done14/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors14 = UNION prior_done14, prior_not_done14;
projected13 = FOREACH prior_neighbors14 GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped13 = FOREACH (GROUP projected13 BY quadkey) GENERATE group AS quadkey, projected13.(geonameid, longitude, latitude, $4) AS points_bag; nearest_neighbors13 = FOREACH grouped13 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); SPLIT nearest_neighbors13 INTO done13 IF COUNT(neighbors) >= 5l, not_done13 IF COUNT(neighbors) < 5l; prior_done13 = LOAD 'done13/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_not_done13 = LOAD 'not_done13/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_neighbors13 = UNION prior_done13, prior_not_done13; projected12 = FOREACH prior_neighbors13 GENERATE SUBSTRING(quadkey, 0, 12) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors; grouped12 = FOREACH (GROUP projected12 BY quadkey) GENERATE group AS quadkey, projected12.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors12 = FOREACH grouped12 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors12 INTO done12 IF COUNT(neighbors) >= 5l, not_done12 IF COUNT(neighbors) < 5l;
prior_done12 = LOAD 'done12/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done12 = LOAD 'not_done12/part*' AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors12 = UNION prior_done12, prior_not_done12;
projected11 = FOREACH prior_neighbors12 GENERATE
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped11 = FOREACH (GROUP projected11 BY quadkey) GENERATE group AS quadkey, projected11.(geonameid, longitude, latitude, $4) AS points_bag; nearest_neighbors11 = FOREACH grouped11 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS ( geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); SPLIT nearest_neighbors11 INTO done11 IF COUNT(neighbors) >= 5l, not_done11 IF COUNT(neighbors) < 5l; prior_done11 = LOAD 'done11/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_not_done11 = LOAD 'not_done11/part*' AS ( quadkey: chararray, geonameid: int, longitude: double, latitude: double, neighbors: bag {t:tuple(neighbor_id:int, distance:double)} ); prior_neighbors11 = UNION prior_done11, prior_not_done11; projected10 = FOREACH prior_neighbors11 GENERATE SUBSTRING(quadkey, 0, 10) AS quadkey, geonameid AS geonameid, longitude AS longitude, latitude AS latitude, neighbors AS neighbors; grouped10 = FOREACH (GROUP projected10 BY quadkey) GENERATE group AS quadkey, projected10.(geonameid, longitude, latitude,$4) AS points_bag;
nearest_neighbors10 = FOREACH grouped10 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors10 INTO done10 IF COUNT(neighbors) >= 5l, not_done10 IF COUNT(neighbors) < 5l;

So you can see now why, with iteration, it's a good idea to generate as much code as possible.

And we're done :)

All the code for this is on github in the Sounder repo (along with tons of other Apache Pig examples).

1. It sure would be nice if someone could show me how to get blogger to NOT interpret the "String", "DataBag" and "Tuple" class names inside the angle brackets as html tags. For correct code please see the github repo sounder. Sorry guys :(

2. This comment has been removed by the author.

3. use '& lt' and '& gt' without the space

Thanks for posting this.

4. Finding the time and actual effort to create a superb article like this is great thing. I’ll learn many new stuff right here! Good luck for the next post buddy....

SEO Company in Chennai

5. It's interesting that many of the bloggers to helped clarify a few things for me as well as giving.Most of ideas can be nice content.The people to give them a good shake to get your point and across the command.

digital marketing company in india

6. This is an awesome post.Really very informative and creative contents. These concept is a good way to enhance the knowledge.I like it and help me to development very well.Thank you for this brief explanation and very nice information.Well, got a good knowledge.
Flats Cleaning in Chennai

7. I do trust all of the concepts you’ve presented on your post. They’re really convincing and will definitely work. Still, the posts are too brief for newbies. May you please extend them a little from subsequent time?Also, I’ve shared your website in my social networks.
Office Interiors in Chennai

8. Thanks admin for sharing the unique content, you have done a great job

Installment loans in Mississippi
Payday loans in Mississippi
Title loans in Mississippi

9. Great post!! This can be one particular of the most useful blogs We’ve ever arrive across on this subject. Basically Wonderful. I am also a specialist in this topic so I can understand your hard work.
Housekeeping services in Chennai
House cleaning service in Chennai

10. I do trust all of the concepts you’ve presented on your post. They’re really convincing and will definitely work. Still, the posts are too brief for newbies. May you please extend them a little from subsequent time?Also, I’ve shared your website in my social networks.
RFID Solutions
Sports Analytic Software
Logistic ERP
Athletic Management Software

11. It's interesting that many of the bloggers your tips helped to clarify a few things for me as well as giving.. very specific niche content. And tell people specific ways to live their lives.Sometimes you just have to yell at people and give them a good shake to get your point across.
Manuscript writing services
Journal Of Zoology
Journal Of Medical Sciences
Botany Journal
Journal Of Agricultural Science

12. Thanks for sharing the descriptive information on Hadoop tutorial. It’s really helpful to me since I'm taking Hadoop training. Keep doing the good work and if you are interested to know more on Hadoop, do check this Hadoop tutorial.:-https://www.youtube.com/watch?v=1jMR4cHBwZE

13. Interested to know the top 10 technologies of 2019? Watch this:https://www.youtube.com/watch?v=-y5Z2fmnp-o

14. I have really happy to these reading your post. This product control and maintenance of our health.The daily routine can assist you weight lose quickly and safely.My life is completely reworked once I followed this diet.I feeling nice concerning myself.

Herbalife in Chennai
Nutrition centers in Chennai
Weight Loss in Chennai
Weight Gain in Chennai

15. AWS Training in Bangalore - Live Online & Classroom
myTectra Amazon Web Services (AWS) certification training helps you to gain real time hands on experience on AWS. myTectra offers AWS training in Bangalore using classroom and AWS Online Training globally. AWS Training at myTectra delivered by the experienced professional who has atleast 4 years of relavent AWS experince and overall 8-15 years of IT experience. myTectra Offers AWS Training since 2013 and retained the positions of Top AWS Training Company in Bangalore and India.

IOT Training in Bangalore - Live Online & Classroom
IOT Training course observes iot as the platform for networking of different devices on the internet and their inter related communication. Reading data through the sensors and processing it with applications sitting in the cloud and thereafter passing the processed data to generate different kind of output is the motive of the complete curricula. Students are made to understand the type of input devices and communications among the devices in a wireless media.

16. IOT Training in Bangalore - Live Online & Classroom
Iot Training course observes iot as the platform for networking of different devices on the internet and their inter related communication. Iot Training in Bangalore

17. Very Impressive Big Data Hadoop tutorial. The content seems to be pretty exhaustive and excellent and will definitely help in learning Big Data Hadoop course. I'm also a learner taken up Big Data Hadoop Tutorial and I think your content has cleared some concepts of mine. While browsing for Hadoop tutorials on YouTube i found this fantastic video on Big Data Hadoop Tutorial.Do check it out if you are interested to know more.https://www.youtube.com/watch?v=nuPp-TiEeeQ&

18. Gaining Python certifications will validate your skills and advance your career.
python certification

19. myTectra Placement Portal is a Web based portal brings Potentials Employers and myTectra Candidates on a common platform for placement assistance

20. Hey, Wow all the posts are very informative for the people who visit this site. Good work! We also have a Website. Please feel free to visit our site. Thank you for sharing.Well written article Thank You Sharing with Us pmp training Chennai | pmp training centers in Chennai | pmp training institutes in Chennai | pmp training and certification in Chennai | pmp training in velachery

21. Wow good to read thanks for sharing

R programming training in chennai

22. Hey Nice Blog!! Thanks For Sharing!!!Wonderful blog & good post.Its really helpful for me, waiting for a more new post. Keep Blogging!
SEO company in coimbatore
SEO company
web design company in coimbatore

23. A bewildering web journal I visit this blog, it's unfathomably heavenly. Oddly, in this present blog's substance made purpose of actuality and reasonable. The substance of data is informative
Oracle Fusion Financials Online Training
Oracle Fusion HCM Online Training
Oracle Fusion SCM Online Training

24. A bewildering web journal I visit this blog, it's unfathomably heavenly. Oddly, in this present blog's substance made purpose of actuality and reasonable. The substance of data is informative
Oracle Fusion Financials Online Training
Oracle Fusion HCM Online Training
Oracle Fusion SCM Online Training

25. An overwhelming web journal I visit this blog, it's unfathomably amazing. Unusually, in this present blog's substance made inspiration driving truth and reasonable. The substance of data is enlightening
Oracle Fusion Financials Online Training
Oracle Fusion HCM Online Training
Oracle Fusion SCM Online Training

26. Nice Information
"Sanjary Academy provides excellent training for Piping design course. Best Piping Design Training Institute in Hyderabad,
Telangana. We have offer professional Engineering Course like Piping Design Course,QA / QC Course,document Controller
course,pressure Vessel Design Course, Welding Inspector Course, Quality Management Course, #Safety officer course."
Piping Design Course in India­
Piping Design Course in Hyderabad
QA / QC Course
QA / QC Course in india
QA / QC Course in Hyderabad
Document Controller course
Pressure Vessel Design Course
Welding Inspector Course
Quality Management Course
Quality Management Course in india
Safety officer course

27. thanks for sharing
Yaaron Studios is one of the rapidly growing editing studios in Hyderabad. We are the best Video Editing services in Hyderabad. We provides best graphic works like logo reveals, corporate presentation Etc. And also we gives the best Outdoor/Indoor shoots and Ad Making services.
video editing studios in hyderabad
short film editors in hyderabad
corporate video editing studio in hyderabad

28. Hi,
Best article, very useful and well explanation. Your post is extremely incredible.Good job & thank you very much for the new information, i learned something new. Very well written. It was sooo good to read and usefull to improve knowledge. Who want to learn this information most helpful. One who wanted to learn this technology IT employees will always suggest you take Data Scientist Certification In Bangalore.

29. Great post very useful info thanks for this post ....
Devops trainign in chennai

30. In recent years, Ubisoft has somewhat modified the direction of development of a number of its franchises, that specialize inmassive open-world games. and also the different day, the top of Ubisoft, Yves Guillot, aforementioned that the corporate wasabout to continue within the same spirit, and explained why.

You may read more on — Blockcrux
If you recall the recently proclaimed and last free games from Ubisoft, you'll be able to simply notice a very importantsimilarity in them: Assassin’s Creed Odyssey, The Division two, so much Cry New Dawn, Ghost Recon Breakpoint, Watch Dogs Legion and even the cartoon Gods & Monsters – of these ar games with an oversized open world. Of course, this is often no accident.
In a spoken communication with Games trade, the top of Ubisoft, Yves Guillot, explained that this was no accident: the corporate is deliberately increasing the dimensions of games, and it's not about to come back to a lot of chamber stories like Assassin’s Creed Unity. a lot of exactly, Ubisoft can tell similar stories, however not as separate games. consistent with Yves Guillot, why unharness a game for fifteen hours if you'll be able to unharness a bigger project for sixty hours, which canembrace identical 15-hour story?

31. Thanks for your effects that you put in for giving very informative and clear post.I always like your posts.Thanks for sharing.Python Programming is king for a majority of ranking metrics.If you need any help in python programming visit our site.
Python Training Institute In Bangalore

32. Great post very useful info thanks for this post ....
Aws training chennai | AWS course in chennai

33. This comment has been removed by the author.

34. This comment has been removed by the author.

35. Nice post. It is really interesting. Thanks for sharing the post!
Buy AC Online | Smart LED TV
Laptops for Sale | Best Inverter AC