mathjax

Wednesday, March 21, 2012

Topic Discovery With Apache Pig and Mallet

A common desire when working with natural language is topic discovery. That is, given a set of documents (eg. tweets, blog posts, emails) you would like to discover the topics inherent in those documents. Often this method is used to summarize a large corpus of text so it can be quickly understood what that text is 'about'. You can go further and use topic discovery as a way to classify new documents or to group and organize the documents you've done topic discovery on.

Latent Dirichlet Allocation



One popular method for topic discovery in a corpus is Latent Dirichlet Allocation (LDA). I won't pretend to be an expert on LDA but the main assumption is as follows. Each document is assumed to be a 'mixture' of topics. Going further, each topic is then assumed to be a distribution over terms. For example, say there is a topic in my corpus labeled 'apache hadoop'. It could be represented as a multinomial probability distribution with high probability of generating terms such as 'hadoop', 'data', 'apache', and 'map-reduce'. See the wikipedia article on LDA

Problem


I'm going to use Apache Pig and Mallet, a java based machine learning and natural language processing library to discover topics in the 20 newsgroups data set. This corpus is nice since each document already belongs to a newsgroup (a topic) and so it gives us a way of checking how well our topic discovery is doing.

The Data


The 20 newsgroups data set can be found on Infochimps here. Once you've got the data go ahead and place it somewhere on your hdfs. I put mine in my home directory under '20newsgroups/data'.

Here's what a head of that data looks like:


alt.atheism.53536 From: kmr4@po.CWRU.edu (Keith M. Ryan) Subject: Re: Smullyanism for the day..... In article ... *snip*
alt.atheism.51164 From: mccullou@snake2.cs.wisc.edu (Mark McCullough) Subject: Re: Idle questions for fellow atheists In article ... *snip*
alt.atheism.53448 From: sandvik@newton.apple.com (Kent Sandvik) Subject: Age of Reason Was: ... *snip*
alt.atheism.53753 Subject: Re: The Inimitable Rushdie From: kmagnacca@eagle.wesleyan.edu In article ... *snip*
alt.atheism.53290 From: Andrew Newell Subject: Re: Christian Morality is In article ... *snip*


It's a little messy but it should get the point across.

So it's tab separated where the first field is the document id (a concatenation of the newsgroup the document is coming from and an integer id). The second field is the document text itself. Here's the pig schema for that:


(doc_id:chararray, text:chararray)


Algorithm


LDA operates on a set of documents. Trivially we could just skip the pig part and write a simple java program that operates on the entire document set and be done with it. But that's not the point. Typically, your input documents have metadata attached to them. For example, the region or user they're coming from, or even just the date they were generated. So we'll just use pig's GROUP BY statement to group the documents by this metadata and cluster the documents within each group independently. If the documents don't have this kind of metadata we're stuck doing a GROUP ALL and dealing with all the documents at once. There are clever ways of parallelizing LDA in this case that I'm not going to go into. See here and here.

Here's a sketch of the algorithm:


  • (1) Load documents

  • (2) Group the documents by appropriate metadata (or by all)

  • (3) Run LDA on each group of documents

  • (4) Profit!!!



Implementation


So it's clear that we're going to need a java udf to do the actual topic clustering. Right? This udf will operate on a DataBag of documents and return a DataBag containing the discovered topics. Each topic will be represented by a Tuple with the following schema:


(topic:tuple(topic_num:int, terms:bag{t:tuple(term:chararray, weight:double)}))


Each topic has a DataBag of top terms associated with it as a way of characterizing the topic discovered.

Here's the simplest implementation of such a udf I could come up with using Mallet (it's a mouthful):


package varaha.topic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.regex.Pattern;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.backend.executionengine.ExecException;

import cc.mallet.pipe.Pipe;
import cc.mallet.pipe.CharSequenceLowercase;
import cc.mallet.pipe.CharSequence2TokenSequence;
import cc.mallet.pipe.CharSequence2CharNGrams;
import cc.mallet.pipe.TokenSequenceNGrams;
import cc.mallet.pipe.TokenSequenceRemoveStopwords;
import cc.mallet.pipe.TokenSequence2FeatureSequence;
import cc.mallet.pipe.TokenSequence2FeatureSequenceWithBigrams;

import cc.mallet.types.TokenSequence;
import cc.mallet.types.Token;

import cc.mallet.pipe.SerialPipes;
import cc.mallet.types.InstanceList;
import cc.mallet.types.Instance;
import cc.mallet.types.Alphabet;
import cc.mallet.types.IDSorter;
import cc.mallet.types.LabelSequence;
import cc.mallet.topics.TopicAssignment;
import cc.mallet.topics.ParallelTopicModel;

public class LDATopics extends EvalFunc<DataBag> {

private Pipe pipe;
private static Long numKeywords = 50l; // Maximum number of keywords to use to describe a topic

public LDATopics() {
pipe = buildPipe();
}

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
return null;

Integer numTopics = (Integer)input.get(0); // Number of topics to discover
DataBag documents = (DataBag)input.get(1); // Documents, {(doc_id, text)}
DataBag result = BagFactory.getInstance().newDefaultBag();

InstanceList instances = new InstanceList(pipe);

// Add the input databag as source data and run it through the pipe built
// by the constructor.
instances.addThruPipe(new DataBagSourceIterator(documents));

// Create a model with numTopics, alpha_t = 0.01, beta_w = 0.01
// Note that the first parameter is passed as the sum over topics, while
// the second is the parameter for a single dimension of the Dirichlet prior.
ParallelTopicModel model = new ParallelTopicModel(numTopics, 1.0, 0.01);
model.addInstances(instances);
model.setNumThreads(1); // Important, since this is being run in the reduce, just use one thread
model.setTopicDisplay(0,0);
model.setNumIterations(2000);
model.estimate();

// Get the results
Alphabet dataAlphabet = instances.getDataAlphabet();
ArrayList<TopicAssignment> assignments = model.getData();

// Convert the results into comprehensible topics
for (int topicNum = 0; topicNum < model.getNumTopics(); topicNum++) {
TreeSet<IDSorter> sortedWords = model.getSortedWords().get(topicNum);
Iterator<IDSorter> iterator = sortedWords.iterator();

DataBag topic = BagFactory.getInstance().newDefaultBag();

// Get the set of keywords with weights for this topic and add them as tuples
// to the databag used to represent this topic
while (iterator.hasNext() && topic.size() < numKeywords) {
IDSorter info = iterator.next();
Tuple weightedWord = TupleFactory.getInstance().newTuple(2);
String wordToken = model.alphabet.lookupObject(info.getID()).toString(); // get the actual term text
weightedWord.set(0, wordToken);
weightedWord.set(1, info.getWeight()); // the raw weight of the term
topic.add(weightedWord);
}
Tuple topicTuple = TupleFactory.getInstance().newTuple(2);
topicTuple.set(0, topicNum);
topicTuple.set(1, topic);
result.add(topicTuple);
}

return result;
}

// Instantiates a new pipe object for ingesting pig tuples
private Pipe buildPipe() {
Pattern tokenPattern = Pattern.compile("\\S[\\S]+\\S");
int[] sizes = {1,2};
ArrayList pipeList = new ArrayList();

pipeList.add(new CharSequence2TokenSequence(tokenPattern));
pipeList.add(new TokenSequenceRemoveStopwords(false, false)); // we should use a real stop word list
pipeList.add(new TokenSequenceNGramsDelim(sizes, " "));
pipeList.add(new TokenSequence2FeatureSequence());
return new SerialPipes(pipeList);
}

/**
A few minor updates to TokenSequenceNGrams:

(1) use delimiter that's passed in to delineate ngrams
*/
private class TokenSequenceNGramsDelim extends TokenSequenceNGrams {
int [] gramSizes = null;
String delim = null;

public TokenSequenceNGramsDelim(int [] sizes, String delim) {
super(sizes);
this.gramSizes = sizes;
this.delim = delim;
}

@Override
public Instance pipe (Instance carrier) {
String newTerm = null;
TokenSequence tmpTS = new TokenSequence();
TokenSequence ts = (TokenSequence) carrier.getData();

for (int i = 0; i < ts.size(); i++) {
Token t = ts.get(i);
for(int j = 0; j < gramSizes.length; j++) {
int len = gramSizes[j];
if (len <= 0 || len > (i+1)) continue;
if (len == 1) { tmpTS.add(t); continue; }
newTerm = new String(t.getText());
for(int k = 1; k < len; k++)
newTerm = ts.get(i-k).getText() + delim + newTerm;
tmpTS.add(newTerm);
}
}
carrier.setData(tmpTS);
return carrier;
}
}

/**
Allow for a databag to be source data for mallet's clustering
*/
private class DataBagSourceIterator implements Iterator<Instance> {

private Iterator<Tuple> tupleItr;
private String currentId;
private String currentText;

public DataBagSourceIterator(DataBag bag) {
tupleItr = bag.iterator();
}

public boolean hasNext() {
if (tupleItr.hasNext()) {
Tuple t = tupleItr.next();
try {
if (!t.isNull(0) && !t.isNull(1)) {
currentId = t.get(0).toString();
currentText = t.get(1).toString();
if (currentId.isEmpty() || currentText.isEmpty()) {
return false;
} else {
return true;
}
}
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
return false;
}

public Instance next() {
// Get the next tuple and pull out its fields
Instance i = new Instance(currentText, "X", currentId, null);
return i;
}

public void remove() {
tupleItr.remove();
}
}
}


There's a few key things going on here. First, the udf operates on a bag that contains tuples with exactly two fields, doc_id and text. Mallet has the notion of pipes where your input data flows through a set of 'pipes' as a way of preparing the data. The class DataBagSourceIterator is simply a convenient way of plugging a DataBag object into this flow.

One of the pipes our documents flow through actually tokenizes the text. TokenSequenceNGramsDelim does this work. Mallet has a built-in TokenSequenceNGrams that works nicely, unfortunately when tokenizing n-grams where n > 1 it uses an '_' by default to separate the terms in the ngram. TokenSequenceNGramsDelim allows us to use our own delimiter, namely a ' ', instead.

Ultimately, all this udf does is read the input documents, prepare them for clustering, runs Mallet's built in LDA methods, and constructs the output DataBag in the way we'd like it.

See varaha for the code itself plus a pom.xml to compile it.

Pig



Now that we have our udf, let's write a pig script to use it. Since the documents I've chosen to use don't have any obvious (or at least easy to get at) additional metadata we're going to use a GROUP ALL. Here's the pig script:


define TokenizeText varaha.text.TokenizeText();
define LDATopics varaha.topic.LDATopics();
define RangeConcat org.pygmalion.udf.RangeBasedStringConcat('0', ' ');

--
-- Load the docs
--
raw_documents = load '$DOCS' as (doc_id:chararray, text:chararray);

--
-- Tokenize text to remove stopwords
--
tokenized = foreach raw_documents generate doc_id AS doc_id, flatten(TokenizeText(text)) as (token:chararray);

--
-- Concat the text for a given doc with spaces
--
documents = foreach (group tokenized by doc_id) generate group as doc_id, RangeConcat(tokenized.token) as text;

--
-- Ensure all our documents are sane
--
for_lda = filter documents by SIZE(doc_id) > 0 and SIZE(text) > 0;

--
-- Group the docs by all and find topics
--
-- WARNING: This is, in general, not appropriate in a production environment.
-- Instead it is best to group by some piece of metadata which partitions
-- the documents into smaller groups.
--
topics = foreach (group for_lda all) generate
FLATTEN(LDATopics(20, for_lda)) as (
topic_num:int,
keywords:bag {t:tuple(keyword:chararray, weight:int)}
);


store topics into '$OUT';


There's a few things worth pointing out here. First, we load our text as normal. There's a step there to tokenize text which seems like it might be spurious. It uses the lucene tokenization udf from here as a way to remove stopwords. You could skip this step if that wasn't important for you. Next, the tokenized text is grouped back together by document id and concatenated back together to form cleaned documents. I'm using the nice udf from pygmalion to do the concatenation. Finally, the documents are grouped together and topics are discovered.

Run it!


At this point we're ready to run our script. I named this script 'discover_topics_example.pig'. And here's how I ran it:


pig -p DOCS=20newsgroups/data -p OUT=20newsgroups/topics discover_topics_example.pig


And here's what the output looks like:


0 {(max,4523.0),(max max,3266.0),(g9v,1161.0),(b8f,1109.0),(a86,913.0),(g9v g9v,834.0),(145,740.0),(1d9,656.0),(a86 a86,643.0),(b8f b8f,599.0),(34u,512.0),(145 145,449.0),(75u,446.0),(bhj,445.0),(giz,430.0),(2di,414.0),(1d9 1d9,322.0),(2tm,300.0),(7ey,292.0),(2di 2di,247.0),(bxn,240.0),(6ei,215.0),(6um,189.0),(34u 34u,168.0),(75u 75u,162.0),(bhj bhj,150.0),(giz giz,142.0),(air,129.0),(qax,127.0),(b4q,120.0),(okz,116.0),(6um 6um,112.0),(nrhj,112.0),(b8e,109.0),(7kn,104.0),(1eq,102.0),(bxn bxn,99.0),(c8v,99.0),(rlk,99.0),(fyn,97.0),(2tm 2tm,96.0),(b9r,96.0),(3dy,96.0),(7ez,95.0),(1d9l,93.0),(b8g,92.0),(biz,91.0),(7ex,86.0),(7ey 7ey,84.0),(r8f,82.0)}
1 {(medical,480.0),(subject,473.0),(disease,431.0),(health,373.0),(cancer,349.0),(msg,344.0),(patients,331.0),(article,329.0),(food,285.0),(writes,281.0),(treatment,253.0),(hiv,249.0),(doctor,237.0),(gordon,229.0),(medicine,226.0),(aids,222.0),(candida,212.0),(diet,208.0),(yeast,204.0),(banks,198.0),(infection,188.0),(geb,185.0),(research,181.0),(drug,180.0),(pain,177.0),(effects,171.0),(study,170.0),(information,170.0),(patient,162.0),(1993,158.0),(clinical,152.0),(vitamin,148.0),(dyer,148.0),(studies,145.0),(chronic,138.0),(risk,135.0),(symptoms,135.0),(april,132.0),(doctors,131.0),(diseases,129.0),(body,127.0),(newsletter,124.0),(blood,124.0),(weight,123.0),(page,122.0),(syndrome,120.0),(foods,119.0),(cs.pitt.edu,115.0),(volume,114.0),(hicnet,109.0)}
2 {(game,1010.0),(subject,831.0),(team,799.0),(hockey,725.0),(play,544.0),(games,466.0),(nhl,465.0),(writes,453.0),(season,390.0),(article,382.0),(win,372.0),(players,372.0),(period,303.0),(player,298.0),(goal,286.0),(teams,274.0),(cup,251.0),(league,248.0),(playoff,247.0),(pit,244.0),(detroit,236.0),(det,233.0),(espn,232.0),(leafs,228.0),(pittsburgh,228.0),(wings,223.0),(playoffs,220.0),(fans,214.0),(boston,207.0),(series,202.0),(toronto,200.0),(bos,198.0),(pens,198.0),(played,192.0),(blues,191.0),(chi,185.0),(montreal,184.0),(puck,181.0),(goals,178.0),(buffalo,176.0),(bruins,175.0),(devils,172.0),(maynard,171.0),(penguins,170.0),(april,169.0),(division,169.0),(power,168.0),(ice,165.0),(tor,161.0),(flyers,160.0)}
3 {(subject,1528.0),(drive,1442.0),(scsi,1181.0),(card,1178.0),(disk,682.0),(windows,644.0),(system,603.0),(ide,571.0),(bus,542.0),(dos,512.0),(hard,490.0),(modem,481.0),(software,460.0),(mac,450.0),(writes,450.0),(drives,447.0),(controller,420.0),(drivers,420.0),(article,394.0),(video,393.0),(bit,382.0),(memory,373.0),(board,365.0),(computer,356.0),(cards,349.0),(port,345.0),(ram,326.0),(driver,317.0),(disks,317.0),(data,313.0),(sale,311.0),(floppy,301.0),(i'm,294.0),(motherboard,289.0),(speed,286.0),(isa,277.0),(mode,252.0),(chip,250.0),(machine,240.0),(486,237.0),(bios,234.0),(ibm,226.0),(serial,221.0),(gateway,215.0),(run,214.0),(hardware,213.0),(set,211.0),(cache,209.0),(diamond,207.0),(mhz,200.0)}
4 {(gun,1095.0),(writes,942.0),(article,838.0),(subject,756.0),(fbi,647.0),(government,545.0),(fire,510.0),(guns,505.0),(batf,464.0),(waco,456.0),(koresh,437.0),(children,400.0),(atf,371.0),(weapons,345.0),(compound,288.0),(people,277.0),(cdt,251.0),(police,248.0),(clinton,248.0),(firearms,240.0),(control,234.0),(gas,233.0),(crime,228.0),(law,219.0),(federal,212.0),(killed,201.0),(david,193.0),(constitution,187.0),(survivors,180.0),(house,162.0),(hallam,158.0),(assault,155.0),(ranch,154.0),(agents,153.0),(criminals,150.0),(deaths,150.0),(arms,150.0),(davidians,145.0),(warrant,143.0),(started,143.0),(amendment,141.0),(roby,138.0),(burns,138.0),(tanks,137.0),(texas,136.0),(veal,131.0),(armed,131.0),(murder,130.0),(cult,129.0),(rights,129.0)}
5 {(andy,110.0),(kratz,91.0),(semi,84.0),(water,77.0),(uicvm.uic.edu,74.0),(revolver,73.0),(gun,72.0),(auto,70.0),(safety,64.0),(freeman,62.0),(jason,59.0),(ndet_loop.c,54.0),(u28037,50.0),(weapon,50.0),(gang,50.0),(cops,49.0),(ole.cdac.com,49.0),(02p,48.0),(expose,48.0),(mydisplay,43.0),(glock,42.0),(mahan,41.0),(sail.stanford.edu andy,39.0),(sail.stanford.edu,39.0),(andy sail.stanford.edu,37.0),(section,37.0),(firearm,36.0),(mwra,36.0),(ssave,35.0),(phil,35.0),(military,35.0),(trigger,34.0),(cement,33.0),(auto semi,31.0),(shooting,31.0),(jason kratz,30.0),(garrett,29.0),(silence,29.0),(autos,28.0),(dominance,28.0),(semi auto,27.0),(water dept,27.0),(concealed,26.0),(revolvers,25.0),(u28037 uicvm.uic.edu,25.0),(moment silence,25.0),(ordnance,25.0),(atlantic,25.0),(ingres.com,25.0),(item,25.0)}
6 {(god,2686.0),(jesus,1457.0),(bible,909.0),(christian,823.0),(christ,809.0),(church,781.0),(christians,739.0),(subject,719.0),(sin,589.0),(lord,504.0),(god's,494.0),(faith,478.0),(people,464.0),(sandvik,425.0),(life,411.0),(christianity,405.0),(writes,405.0),(paul,367.0),(love,358.0),(law,357.0),(hell,344.0),(heaven,323.0),(truth,313.0),(word,311.0),(athos.rutgers.edu,295.0),(article,295.0),(john,283.0),(catholic,280.0),(scripture,278.0),(father,257.0),(holy,254.0),(spirit,253.0),(son,248.0),(kent,234.0),(true,233.0),(eternal,231.0),(doctrine,231.0),(death,226.0),(brian,223.0),(day,214.0),(apr,211.0),(newton.apple.com,210.0),(jehovah,204.0),(children,204.0),(biblical,203.0),(words,200.0),(book,198.0),(earth,191.0),(jews,190.0),(matthew,190.0)}
7 {(turkish,827.0),(armenian,826.0),(armenians,694.0),(armenia,452.0),(turkey,449.0),(turks,414.0),(war,383.0),(muslim,360.0),(genocide,346.0),(muslims,343.0),(soviet,332.0),(people,310.0),(greek,294.0),(government,268.0),(argic,263.0),(serdar,262.0),(russian,254.0),(jews,248.0),(history,243.0),(azerbaijan,239.0),(world,202.0),(university,201.0),(greece,199.0),(istanbul,193.0),(killed,185.0),(population,184.0),(article,176.0),(ottoman,170.0),(bosnia,169.0),(children,169.0),(soldiers,167.0),(serbs,160.0),(greeks,158.0),(europe,156.0),(1920,148.0),(extermination,147.0),(army,144.0),(zuma.uucp,141.0),(subject,141.0),(1919,134.0),(sera,132.0),(dead,132.0),(troops,130.0),(bosnian,130.0),(mountain,129.0),(closed,128.0),(serve,128.0),(roads,127.0),(empire,127.0),(escape,126.0)}
8 {(writes,525.0),(cramer,460.0),(article,459.0),(homosexual,407.0),(gay,402.0),(homosexuality,384.0),(subject,351.0),(sex,324.0),(sexual,281.0),(clayton,274.0),(health,271.0),(homosexuals,256.0),(optilink.com,248.0),(drugs,240.0),(insurance,216.0),(study,214.0),(drug,202.0),(government,195.0),(male,168.0),(tax,146.0),(kaldis,144.0),(private,134.0),(people,131.0),(percentage,129.0),(child,126.0),(care,120.0),(children,118.0),(clayton cramer,116.0),(cramer optilink.com,108.0),(optilink.com cramer,108.0),(state.edu,108.0),(women,101.0),(kinsey,95.0),(cramer clayton,95.0),(rights,95.0),(population,95.0),(magnus.acs.ohio,94.0),(writes article,88.0),(abortion,84.0),(american,84.0),(partners,83.0),(political,80.0),(gays,78.0),(heterosexual,78.0),(national,78.0),(church,76.0),(evidence,76.0),(issues,75.0),(laws,75.0),(optilink,74.0)}
9 {(subject,898.0),(list,647.0),(mail,581.0),(university,394.0),(address,317.0),(email,278.0),(information,276.0),(mailing,246.0),(send,219.0),(computer,216.0),(1993,210.0),(internet,199.0),(conference,198.0),(research,197.0),(fax,189.0),(systems,159.0),(newsgroup,155.0),(contact,141.0),(message,141.0),(government,130.0),(date,130.0),(apr,130.0),(steve,130.0),(request,125.0),(info,122.0),(phone,121.0),(news,120.0),(canada,118.0),(article,118.0),(br.com,111.0),(science,110.0),(dept,110.0),(writes,110.0),(newsgroups,109.0),(april,107.0),(national,105.0),(graphics,103.0),(book,102.0),(john,102.0),(mailing list,98.0),(institute,98.0),(books,97.0),(addresses,96.0),(organization,96.0),(usa,95.0),(software,95.0),(james,93.0),(reply,92.0),(paul,89.0),(center,88.0)}
10 {(subject,1355.0),(sale,583.0),(writes,375.0),(power,362.0),(battery,316.0),(article,290.0),(radio,242.0),(sound,224.0),(radar,209.0),(phone,208.0),(circuit,199.0),(ground,183.0),(offer,182.0),(shipping,178.0),(condition,178.0),(mail,176.0),(audio,174.0),(tape,173.0),(sell,171.0),(signal,168.0),(current,167.0),(amp,166.0),(price,164.0),(detector,162.0),(system,160.0),(box,155.0),(heat,154.0),(batteries,150.0),(line,148.0),(blue,144.0),(noise,143.0),(output,142.0),(voltage,142.0),(chip,141.0),(stereo,137.0),(games,131.0),(supply,131.0),(equipment,129.0),(cpu,127.0),(john,126.0),(light,122.0),(wire,121.0),(low,121.0),(air,117.0),(input,115.0),(channel,114.0),(car,112.0),(original,109.0),(lead,109.0),(computer,108.0)}
11 {(subject,1177.0),(monitor,490.0),(writes,481.0),(apple,473.0),(mac,388.0),(article,335.0),(printer,296.0),(mouse,279.0),(computer,269.0),(xterm,243.0),(video,240.0),(software,233.0),(centris,226.0),(screen,208.0),(mail,205.0),(i've,203.0),(keyboard,200.0),(system,198.0),(monitors,191.0),(i'm,190.0),(color,181.0),(price,168.0),(running,168.0),(duo,166.0),(quadra,160.0),(bit,158.0),(print,152.0),(machine,151.0),(university,143.0),(internet,139.0),(info,139.0),(email,133.0),(ram,130.0),(vga,126.0),(laser,125.0),(key,122.0),(x11r5,120.0),(speed,120.0),(610,119.0),(box,119.0),(fpu,117.0),(display,116.0),(fax,116.0),(lib,114.0),(powerbook,113.0),(buy,110.0),(deskjet,109.0),(simms,109.0),(lciii,107.0),(hardware,104.0)}
12 {(windows,1782.0),(subject,1779.0),(file,1209.0),(program,863.0),(files,838.0),(window,809.0),(dos,715.0),(image,676.0),(writes,586.0),(graphics,541.0),(article,497.0),(software,441.0),(run,425.0),(display,424.0),(version,422.0),(server,404.0),(code,386.0),(data,371.0),(ftp,369.0),(images,366.0),(unix,364.0),(color,349.0),(bit,347.0),(application,339.0),(i'm,324.0),(manager,323.0),(motif,321.0),(directory,313.0),(format,312.0),(user,302.0),(mail,302.0),(system,296.0),(gif,292.0),(running,288.0),(microsoft,287.0),(screen,284.0),(package,271.0),(faq,264.0),(jpeg,259.0),(i've,251.0),(line,250.0),(memory,247.0),(programs,244.0),(3.1,236.0),(applications,226.0),(set,218.0),(support,201.0),(information,199.0),(source,198.0),(text,195.0)}
13 {(space,1339.0),(subject,597.0),(writes,567.0),(henry,484.0),(launch,443.0),(article,421.0),(moon,376.0),(earth,373.0),(orbit,366.0),(nasa,365.0),(shuttle,323.0),(spacecraft,268.0),(system,265.0),(solar,264.0),(mission,264.0),(satellite,261.0),(pat,254.0),(sky,253.0),(zoo.toronto.edu,222.0),(data,215.0),(science,214.0),(spencer,205.0),(station,203.0),(lunar,195.0),(energy,174.0),(flight,166.0),(cost,165.0),(project,162.0),(hst,161.0),(mars,161.0),(program,161.0),(zoo.toronto.edu henry,159.0),(ray,155.0),(prb,152.0),(henry zoo.toronto.edu,152.0),(billion,151.0),(atmosphere,149.0),(technology,145.0),(baalke,143.0),(april,140.0),(nuclear,139.0),(gamma,138.0),(universe,137.0),(astronomy,133.0),(theory,133.0),(planet,132.0),(sun,131.0),(commercial,130.0),(aurora.alaska.edu,127.0),(nsmca,124.0)}
14 {(subject,1478.0),(car,1469.0),(writes,1381.0),(article,1227.0),(bike,741.0),(cars,481.0),(dod,471.0),(engine,422.0),(ride,306.0),(bmw,292.0),(speed,280.0),(miles,276.0),(oil,267.0),(front,259.0),(drive,254.0),(honda,249.0),(riding,244.0),(road,244.0),(ford,242.0),(i've,240.0),(i'm,228.0),(driving,217.0),(dealer,210.0),(buy,206.0),(insurance,205.0),(bikes,204.0),(dog,198.0),(motorcycle,196.0),(rear,179.0),(price,171.0),(wheel,168.0),(left,164.0),(clutch,162.0),(helmet,157.0),(article writes,155.0),(writes article,154.0),(tires,147.0),(brake,146.0),(mike,143.0),(shaft,140.0),(john,138.0),(don't,138.0),(andrew,135.0),(mustang,133.0),(gas,131.0),(manual,131.0),(bought,129.0),(fast,128.0),(buying,127.0),(bnr.ca,124.0)}
15 {(key,1377.0),(clipper,1002.0),(encryption,857.0),(chip,851.0),(subject,667.0),(government,646.0),(keys,557.0),(writes,507.0),(security,492.0),(public,453.0),(privacy,419.0),(escrow,416.0),(article,410.0),(des,390.0),(system,385.0),(algorithm,375.0),(law,358.0),(phone,339.0),(nsa,332.0),(netcom.com,327.0),(crypto,308.0),(information,307.0),(secure,304.0),(pgp,300.0),(message,285.0),(secret,282.0),(data,276.0),(bit,265.0),(david,258.0),(cryptography,245.0),(enforcement,245.0),(anonymous,240.0),(code,235.0),(technology,229.0),(encrypted,219.0),(wiretap,217.0),(sternlight,209.0),(internet,204.0),(chip clipper,199.0),(clipper chip,199.0),(access,189.0),(agencies,188.0),(communications,188.0),(chips,184.0),(private,182.0),(computer,173.0),(clinton,167.0),(rsa,165.0),(mail,162.0),(administration,155.0)}
16 {(god,963.0),(writes,892.0),(subject,674.0),(article,625.0),(atheists,525.0),(morality,509.0),(religion,497.0),(moral,449.0),(evidence,432.0),(keith,415.0),(science,380.0),(atheism,365.0),(objective,365.0),(belief,319.0),(christian,317.0),(livesey,312.0),(islam,305.0),(argument,288.0),(atheist,287.0),(exist,287.0),(faith,277.0),(existence,274.0),(religious,272.0),(islamic,258.0),(frank,247.0),(mathew,242.0),(jon,235.0),(beliefs,198.0),(reason,191.0),(claim,188.0),(true,187.0),(exists,179.0),(statement,173.0),(christianity,169.0),(bible,169.0),(wrong,166.0),(values,162.0),(theism,160.0),(system,158.0),(universe,157.0),(truth,155.0),(solntze.wpd.sgi.com,152.0),(cobb,149.0),(jaeger,148.0),(scientific,145.0),(rushdie,142.0),(muslim,142.0),(question,140.0),(agree,139.0),(wrote,133.0)}
17 {(don't,6736.0),(people,6088.0),(subject,5058.0),(time,4513.0),(i'm,3901.0),(writes,3872.0),(article,3356.0),(read,1874.0),(can't,1861.0),(question,1855.0),(doesn't,1769.0),(didn't,1683.0),(i've,1672.0),(that's,1653.0),(lot,1401.0),(real,1388.0),(day,1386.0),(post,1360.0),(world,1311.0),(person,1239.0),(you're,1239.0),(true,1214.0),(wrong,1211.0),(isn't,1155.0),(heard,1136.0),(reason,1125.0),(bad,1125.0),(times,1098.0),(called,1096.0),(i'd,1082.0),(idea,1043.0),(found,1043.0),(remember,1032.0),(life,1023.0),(system,1003.0),(makes,983.0),(means,974.0),(call,969.0),(money,965.0),(power,942.0),(free,926.0),(ago,925.0),(based,909.0),(feel,907.0),(hope,887.0),(hard,887.0),(i'll,885.0),(questions,876.0),(matter,875.0),(david,865.0)}
18 {(israel,1036.0),(israeli,740.0),(jews,599.0),(writes,561.0),(arab,515.0),(subject,497.0),(jewish,482.0),(article,461.0),(war,318.0),(arabs,288.0),(policy,242.0),(jake,235.0),(peace,234.0),(palestinian,224.0),(adl,215.0),(center,187.0),(israelis,185.0),(palestinians,177.0),(kuwait,175.0),(anti,169.0),(killed,164.0),(research,164.0),(palestine,158.0),(gaza,152.0),(igc.apc.org,147.0),(civilians,144.0),(soldiers,144.0),(occupied,144.0),(american,142.0),(cpr,141.0),(iran,141.0),(lebanese,138.0),(land,138.0),(virginia.edu,136.0),(bony.com,131.0),(bony1,131.0),(jew,130.0),(government,129.0),(jerusalem,127.0),(lebanon,126.0),(rights,123.0),(clinton,121.0),(israel's,119.0),(adam,119.0),(zionism,118.0),(press,118.0),(country,117.0),(president,116.0),(opinions,114.0),(yassin,113.0)}
19 {(subject,866.0),(writes,741.0),(article,596.0),(game,578.0),(baseball,516.0),(team,462.0),(players,402.0),(games,385.0),(hit,286.0),(runs,264.0),(season,241.0),(morris,227.0),(win,223.0),(league,223.0),(braves,214.0),(michael,201.0),(ball,192.0),(pitching,181.0),(he's,179.0),(player,176.0),(pitcher,171.0),(hitter,170.0),(play,162.0),(georgia,160.0),(average,158.0),(run,155.0),(roger,150.0),(jays,149.0),(hitting,147.0),(sox,146.0),(david,146.0),(cubs,137.0),(home,135.0),(stats,134.0),(fans,133.0),(mike,131.0),(giants,128.0),(fan,127.0),(base,126.0),(batting,125.0),(pitch,125.0),(ai.uga.edu,124.0),(bonds,123.0),(covington,122.0),(jewish,121.0),(time,121.0),(career,120.0),(teams,119.0),(mcovingt,119.0),(smith,117.0)}


Labeling


Now, we'd like to see how well we did. Here's the 20 topics we _know_ should exist:


  • alt.atheism

  • comp.graphics

  • comp.os.ms-windows.misc

  • comp.sys.ibm.pc.hardware

  • comp.sys.mac.hardware

  • comp.windows.x

  • misc.forsale

  • rec.autos

  • rec.motorcycles

  • rec.sport.baseball

  • rec.sport.hockey

  • sci.crypt

  • sci.electronics

  • sci.med

  • sci.space

  • soc.religion.christian

  • talk.politics.guns

  • talk.politics.mideast

  • talk.politics.misc

  • talk.religion.misc



There are a number of methods for labeling topics discovered in this way, (see here), but in the interest of time I'm going to manually match the topics above to the ones discovered. Obviously, 'eyeballing' it isn't appropriate for a production environment...


  • alt.atheism,16

  • comp.graphics,12

  • comp.os.ms-windows.misc,9

  • comp.sys.ibm.pc.hardware,3

  • comp.sys.mac.hardware,11

  • comp.windows.x,0

  • misc.forsale,10

  • rec.autos,14

  • rec.motorcycles,14

  • rec.sport.baseball,19

  • rec.sport.hockey,2

  • sci.crypt,15

  • sci.electronics

  • sci.med,1

  • sci.space,13

  • soc.religion.christian,6

  • talk.politics.guns,5

  • talk.politics.mideast,18,7

  • talk.politics.misc,4

  • talk.religion.misc,8



So, as far as I can tell there are some that map to multiple of the topics discovered and some that don't seem to map to one discovered at all. It's clear there's room for improvement (look at the parameters alpha and beta I'm hardcoding in the topic model for example). But all in all it's pretty good as a first pass. Now go away and find some topics.

Hurray!

Wednesday, October 26, 2011

Nearest Neighbors With Apache Pig and Jruby

Since it's been such a long time since I last posted I thought I'd make this one a bit longer. It really is a condensing of a lot of things I've been working with and thinking about over the past few months.

Nearest Neighbors



The nearest neighbors problem (also known as the post-office problem) is this: Given a point X in some metric space M, assign to it the nearest neighboring point S. In other words, given a residence, assign to it the nearest post office. The K-nearest neighbors problem, which this post addresses, is just a slight generalization of that problem. Instead of just one neighbor we are looking for K neighbors.

Problem



So, we're going to use the geonames data. This is a set of nearly 8 million geo points with names, coordinates, and a bunch of other good stuff, from around the world. We would like to find, for a given point in the geonames set, the 5 nearest points (also in geonames) that are nearest to it. Should be pretty simple yeah?

Get data


The geonames data set 'allCountries.zip' can be downloaded like so:


$: wget http://download.geonames.org/export/dump/allCountries.zip


Prepare data


Since the geonames data set comes as a nice tab-separated-values (.tsv) file already it's just a matter of unzipping the package and placing it on your hdfs (you do have one of those don't you?). Do:


$: unzip allCountries.zip
$: hadoop fs -put allCountries.txt .


to unzip the package and place the tsv file into your home directory on the hadoop distributed file system.

Schema


Oh, and by the way, before we forget, the data from geonames has this pig schema:


geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray


The Algorithm


Now that we have the data we can start to play with it and think about how to solve the problem at hand. Looking at the data (use something like 'head', 'cat', 'cut', etc) we see that there are really only three fields of interest in the data: (geonameid, longitude, and latitude). All the other fields are just nice metadata which we can attach later.

Now, since we're going to be using Apache Pig to solve this problem we need to think a little bit about parallelism. One constraint is that at no time is any one point going to have access to the locations of all the other points. In other words, we will not be storing the full set of points in memory. Besides, it's 8 million points, that's kind of a lot for my poor little machine to handle.

So it's clear (right?) that we're going to have to partition the space in some way. Then, within a partition of the space, we'll need to apply a local version of the nearest neighbors algorithm. That's it really. Map and reduce. Wait, but there's one problem. What happens if we don't find all 5 neighbors for a point in a single partition? Hmmm. Well, the answer is iteration. We'll choose a small partition size to begin with and gradually increase the partition size until either the partition size is too large or all the neighbors have been found. Got it?

Recap:


  • (1) Partition the space

  • (2) Search for nearest neighbors in a single partition

  • (3) If all neighbors have been found, terminate; else increase partition size and repeat (1) and (2)



Implementation


For partitioning the space we're going to use Google quadkeys (http://msdn.microsoft.com/en-us/library/bb259689.aspx) since it's super easy to implement and it partitions the space nicely. This will be a java UDF for Pig that takes a (longitude, latitude, and zoom level) tuple and returns a string quadkey (the partition id).

Here's the actual java code for that. Let's call it "GetQuadkey":


package sounder.pig.geo;

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

/**
See: http://msdn.microsoft.com/en-us/library/bb259689.aspx

A Pig UDF to compute the quadkey string for a given
(longitude, latitude, resolution) tuple.

*/
public class GetQuadkey extends EvalFunc< String> {
private static final int TILE_SIZE = 256;

public String exec(Tuple input) throws IOException {
if (input == null || input.size() < 3 || input.isNull(0) || input.isNull(1) || input.isNull(2))
return null;

Double longitude = (Double)input.get(0);
Double latitude = (Double)input.get(1);
Integer resolution = (Integer)input.get(2);

String quadKey = quadKey(longitude, latitude, resolution);
return quadKey;
}

private static String quadKey(double longitude, double latitude, int resolution) {
int[] pixels = pointToPixels(longitude, latitude, resolution);
int[] tiles = pixelsToTiles(pixels[0], pixels[1]);
return tilesToQuadKey(tiles[0], tiles[1], resolution);
}

/**
Return the pixel X and Y coordinates for the given lat, lng, and resolution.
*/
private static int[] pointToPixels(double longitude, double latitude, int resolution) {
double x = (longitude + 180) / 360;
double sinLatitude = Math.sin(latitude * Math.PI / 180);
double y = 0.5 - Math.log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * Math.PI);

int mapSize = mapSize(resolution);
int[] pixels = {(int) trim(x * mapSize + 0.5, 0, mapSize - 1), (int) trim(y * mapSize + 0.5, 0, mapSize - 1)};
return pixels;
}

/**
Convert from pixel coordinates to tile coordinates.
*/
private static int[] pixelsToTiles(int pixelX, int pixelY) {
int[] tiles = {pixelX / TILE_SIZE, pixelY / TILE_SIZE};
return tiles;
}

/**
Finally, given tile coordinates and a resolution, returns the appropriate quadkey
*/
private static String tilesToQuadKey(int tileX, int tileY, int resolution) {
StringBuilder quadKey = new StringBuilder();
for (int i = resolution; i > 0; i--) {
char digit = '0';
int mask = 1 << (i - 1);
if ((tileX & mask) != 0) {
digit++;
}
if ((tileY & mask) != 0) {
digit++;
digit++;
}
quadKey.append(digit);
}
return quadKey.toString();
}

/**
Ensure input value is within minval and maxval
*/
private static double trim(double n, double minVal, double maxVal) {
return Math.min(Math.max(n, minVal), maxVal);
}

/**
Width of the map, in pixels, at the given resolution
*/
public static int mapSize(int resolution) {
return TILE_SIZE << resolution;
}
}


Next we need to have another java udf that operates on all the points in a partition. Let's call it "NearestNeighbors". Here's a naive implementation of that:


package sounder.pig.geo.nearestneighbors;

import java.io.IOException;
import java.util.PriorityQueue;
import java.util.Iterator;
import java.util.Comparator;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;

public class NearestNeighbors extends EvalFunc< DataBag> {
private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
return null;

Long k = (Long)input.get(0);
DataBag points = (DataBag)input.get(1); // {(id,lng,lat,{(n1,n1_dist)...})}
DataBag result = bagFactory.newDefaultBag();

for (Tuple pointA : points) {
DataBag neighborsBag = (DataBag)pointA.get(3);
if (neighborsBag.size() < k) {
PriorityQueue< Tuple> neighbors = toDistanceSortedQueue(k.intValue(), neighborsBag);
Double x1 = Math.toRadians((Double)pointA.get(1));
Double y1 = Math.toRadians((Double)pointA.get(2));

for (Tuple pointB : points) {
if (pointA!=pointB) {
Double x2 = Math.toRadians((Double)pointB.get(1));
Double y2 = Math.toRadians((Double)pointB.get(2));
Double distance = haversineDistance(x1,y1,x2,y2);

// Add this point as a neighbor if pointA has no neighbors
if (neighbors.size()==0) {
Tuple newNeighbor = tupleFactory.newTuple(2);
newNeighbor.set(0, pointB.get(0));
newNeighbor.set(1, distance);
neighbors.add(newNeighbor);
}

Tuple furthestNeighbor = neighbors.peek();
Double neighborDist = (Double)furthestNeighbor.get(1);
if (distance < neighborDist) {
Tuple newNeighbor = tupleFactory.newTuple(2);
newNeighbor.set(0, pointB.get(0));
newNeighbor.set(1, distance);

if (neighbors.size() < k) {
neighbors.add(newNeighbor);
} else {
neighbors.poll(); // remove farthest
neighbors.add(newNeighbor);
}
}
}
}
// Should now have a priorityqueue containing a sorted list of neighbors
// create new result tuple and add to result bag
Tuple newPointA = tupleFactory.newTuple(4);
newPointA.set(0, pointA.get(0));
newPointA.set(1, pointA.get(1));
newPointA.set(2, pointA.get(2));
newPointA.set(3, fromQueue(neighbors));
result.add(newPointA);
} else {
result.add(pointA);
}
}
return result;
}

// Ensure sorted by descending
private PriorityQueue< Tuple> toDistanceSortedQueue(int k, DataBag bag) {
PriorityQueue< Tuple> q = new PriorityQueue< Tuple>(k,
new Comparator< Tuple>() {
public int compare(Tuple t1, Tuple t2) {
try {
Double dist1 = (Double)t1.get(1);
Double dist2 = (Double)t2.get(1);
return dist2.compareTo(dist1);
} catch (ExecException e) {
throw new RuntimeException("Error comparing tuples", e);
}
};
});
for (Tuple tuple : bag) q.add(tuple);
return q;
}

private DataBag fromQueue(PriorityQueue< Tuple> q) {
DataBag bag = bagFactory.newDefaultBag();
for (Tuple tuple : q) bag.add(tuple);
return bag;
}

private Double haversineDistance(Double x1, Double y1, Double x2, Double y2) {
double a = Math.pow(Math.sin((x2-x1)/2), 2)
+ Math.cos(x1) * Math.cos(x2) * Math.pow(Math.sin((y2-y1)/2), 2);

return (2 * Math.asin(Math.min(1, Math.sqrt(a))));
}
}



The details of the NearestNeighbors UDF aren't super important and it's mostly pretty clear what's going on. Just know that it operates on a bag of points as input and returns a bag of points as output that has the same schema. This is really important since we're going to be iterating.

Then we're on to the Pig part, hurray! Since Pig doesn't have any built in support for iteration, I chose to use Jruby (because it's awesome) and pig's "PigServer" java class to do all the work. Here's what the jruby runner looks like (it's kind of a lot so don't get scared):


#!/usr/bin/env jruby

require 'java'

#
# You might consider changing this to point to where you have
# pig installed...
#
jar = "/usr/lib/pig/pig-0.8.1-cdh3u1-core.jar"
conf = "/etc/hadoop/conf"

$CLASSPATH << conf
require jar

import org.apache.pig.ExecType
import org.apache.pig.PigServer
import org.apache.pig.FuncSpec

class NearestNeighbors

attr_accessor :points, :k, :min_zl, :runmode

#
# Create a new nearest neighbors instance
# for the given points, k neighbors to find,
# a optional minimum zl (1-21) and optional
# hadoop run mode (local or mapreduce)
#
def initialize points, k, min_zl=20, runmode='mapreduce'
@points = points
@k = k
@min_zl = min_zl
@runmode = runmode
end

#
# Run the nearest neighbors algorithm
#
def run
start_pig_server
register_jars_and_functions
run_algorithm
stop_pig_server
end

#
# Actually runs all the pig queries for
# the algorithm. Stops if all neighbors
# have been found or if min_zl is reached
#
def run_algorithm
start_nearest_neighbors(points, k, 22)
if run_nearest_neighbors(k, 22)
21.downto(min_zl) do |zl|
iterate_nearest_neighbors(k, zl)
break unless run_nearest_neighbors(k,zl)
end
end
end

#
# Registers algorithm initialization queries
#
def start_nearest_neighbors(input, k, zl)
@pig.register_query(PigQueries.load_points(input))
@pig.register_query(PigQueries.generate_initial_quadkeys(zl))
end

#
# Registers algorithm iteration queries
#
def iterate_nearest_neighbors k, zl
@pig.register_query(PigQueries.load_prior_done(zl))
@pig.register_query(PigQueries.load_prior_not_done(zl))
@pig.register_query(PigQueries.union_priors(zl))
@pig.register_query(PigQueries.trim_quadkey(zl))
end

#
# Runs one iteration of the algorithm
#
def run_nearest_neighbors(k, zl)
@pig.register_query(PigQueries.group_by_quadkey(zl))
@pig.register_query(PigQueries.nearest_neighbors(k, zl))
@pig.register_query(PigQueries.split_results(k, zl))

if !@pig.exists_file("done#{zl}")
@pig.store("done#{zl}", "done#{zl}")
not_done = @pig.store("not_done#{zl}", "not_done#{zl}")
not_done.get_results.has_next?
else
true
end
end

#
# Start a new pig server with the specified run mode
#
def start_pig_server
@pig = PigServer.new(runmode)
end

#
# Stop the running pig server
#
def stop_pig_server
@pig.shutdown
end

#
# Register the jar that contains the nearest neighbors
# and quadkeys udfs and define functions for them.
#
def register_jars_and_functions
@pig.register_jar('../../../../udf/target/sounder-1.0-SNAPSHOT.jar')
@pig.register_function('GetQuadkey', FuncSpec.new('sounder.pig.geo.GetQuadkey()'))
@pig.register_function('NearestNeighbors', FuncSpec.new('sounder.pig.geo.nearestneighbors.NearestNeighbors()'))
end

#
# A simple class to contain the pig queries
#
class PigQueries

#
# Load the geonames points. Obviously,
# this should be modified to accept a
# variable schema.
#
def self.load_points geonames
"points = LOAD '#{geonames}' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray
);"
end

#
# Query to generate quadkeys at the specified zoom level
#
def self.generate_initial_quadkeys(zl)
"projected#{zl} = FOREACH points GENERATE GetQuadkey(longitude, latitude, #{zl}) AS quadkey, geonameid, longitude, latitude, {};"
end

#
# Load previous iteration's done points
#
def self.load_prior_done(zl)
"prior_done#{zl+1} = LOAD 'done#{zl+1}/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Load previous iteration's not done points
#
def self.load_prior_not_done(zl)
"prior_not_done#{zl+1} = LOAD 'not_done#{zl+1}/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Union the previous iterations points that are done
# with the points that are not done
#
def self.union_priors zl
"prior_neighbors#{zl+1} = UNION prior_done#{zl+1}, prior_not_done#{zl+1};"
end

#
# Chop off one character of precision from the existing
# quadkey to go one zl down.
#
def self.trim_quadkey zl
"projected#{zl} = FOREACH prior_neighbors#{zl+1} GENERATE
SUBSTRING(quadkey, 0, #{zl}) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;"
end

#
# Group the points by quadkey
#
def self.group_by_quadkey zl
"grouped#{zl} = FOREACH (GROUP projected#{zl} BY quadkey) GENERATE group AS quadkey, projected#{zl}.(geonameid, longitude, latitude, $4) AS points_bag;"
end

#
# Run the nearest neighbors udf on all the points for
# a given quadkey
#
def self.nearest_neighbors(k, zl)
"nearest_neighbors#{zl} = FOREACH grouped#{zl} GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(#{k}l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);"
end

#
# Split the results into done and not_done relations
# The algorithm is done when 'not_done' contains
# no more tuples.
#
def self.split_results(k, zl)
"SPLIT nearest_neighbors#{zl} INTO done#{zl} IF COUNT(neighbors) >= #{k}l, not_done#{zl} IF COUNT(neighbors) < #{k}l;"
end
end

end

NearestNeighbors.new(ARGV[0], ARGV[1]).run



Call this file "nearest_neighbors.rb". The idea here is that we register some basic pig queries to do the initialization of the algorithm and the iterations. These queries are run over and over until either the "not_done" relation contains no more elements or the minimum zoom level has been reached. Note that a small zoom level means a big partition of space.

Run it!


I think we're finally ready to run it. Let K=5 and the min zoom level (zl) be 10. Then just run:


$: ./nearest_neighbors.rb allCountries.txt 5 10


To kick it off. The output will live in 'done10' (in your home directory on the hdfs) and all the ones that couldn't find their neighbors (poor guys) are left in 'not_done10'. Let's take a look:


$: hadoop fs -cat done10/part* | head
22321231 5874132 -164.73333 67.83333 {(5870713,0.0020072489956274512),(5864687,0.001833343068439346),(5879702,0.0017344751302650937),(5879702,0.0017344751302650937),(5866444,9.775849082653818E-4)}
133223322 2631726 -17.98263 66.52688 {(2631999,8.959503690090641E-4),(2629528,8.491922360779314E-4),(2629528,8.491922360779314E-4),(2630727,3.840018669838177E-4),(2630727,3.840018669838177E-4)}
133223322 2631999 -18.01884 66.56514 {(2631726,8.959503690090641E-4),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5),(2630727,6.687797018366464E-4),(2629528,4.889879974917344E-5)}
200103201 5874186 -165.39611 64.74333 {(5864454,0.001422335354026523),(5864454,0.001422335354026523),(5867287,0.0013175743301195593),(5867186,0.0010114669397588846),(5867287,0.0013175743301195593)}
200103201 5878614 -165.3 64.76667 {(5874186,0.0017231123142588567),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047),(5867287,0.0012670407374788086),(5864454,0.0012205595078534047)}
200103203 5875461 -165.55111 64.53889 {(5865814,0.0028283599772347947),(5874676,0.0025819291222640857),(5876108,0.001901914079309611),(5869354,0.0016504815389672197),(5869180,0.0025319553109125676)}
200103300 5861248 -164.27639 64.69278 {(5880635,9.627541402483858E-4),(5878642,8.535957131129946E-4),(5878642,8.535957131129946E-4),(5876626,6.598180173900259E-4),(5876626,6.598180173900259E-4)}
200103300 5876626 -164.27111 64.65389 {(5880635,8.246219806226404E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4),(5861248,6.598180173900259E-4),(5878642,3.7418928038080964E-4)}
200233011 5870290 -170.3 57.21667 {(5867100,0.00113848360324883),(7117548,0.0011082333731440464),(7117548,0.0011082333731440464),(5865746,0.001071745830095263),(5865746,0.001071745830095263)}
200233123 5873595 -169.48056 56.57778 {(7275749,0.0010608526185899635),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4),(5878477,5.242632532229457E-4),(5875162,3.39969838478673E-4)}


Hurray!

Pig Code


Let's just take a look at the pig code that actually got executed.


points = LOAD 'allCountries.txt' AS (
geonameid: int,
name: chararray,
asciiname: chararray,
alternatenames: chararray,
latitude: double,
longitude: double,
feature_class: chararray,
feature_code: chararray,
country_code: chararray,
cc2: chararray,
admin1_code: chararray,
admin2_code: chararray,
admin3_code: chararray,
admin4_code: chararray,
population: long,
elevation: int,
gtopo30: int,
timezone: chararray,
modification_date: chararray
);
projected22 = FOREACH points GENERATE GetQuadkey(longitude, latitude, 22) AS quadkey, geonameid, longitude, latitude, {};
grouped22 = FOREACH (GROUP projected22 BY quadkey) GENERATE group AS quadkey, projected22.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors22 = FOREACH grouped22 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors22 INTO done22 IF COUNT(neighbors) >= 5l, not_done22 IF COUNT(neighbors) < 5l;
prior_done22 = LOAD 'done22/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done22 = LOAD 'not_done22/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors22 = UNION prior_done22, prior_not_done22;
projected21 = FOREACH prior_neighbors22 GENERATE
SUBSTRING(quadkey, 0, 21) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped21 = FOREACH (GROUP projected21 BY quadkey) GENERATE group AS quadkey, projected21.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors21 = FOREACH grouped21 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors21 INTO done21 IF COUNT(neighbors) >= 5l, not_done21 IF COUNT(neighbors) < 5l;
prior_done21 = LOAD 'done21/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done21 = LOAD 'not_done21/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors21 = UNION prior_done21, prior_not_done21;
projected20 = FOREACH prior_neighbors21 GENERATE
SUBSTRING(quadkey, 0, 20) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped20 = FOREACH (GROUP projected20 BY quadkey) GENERATE group AS quadkey, projected20.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors20 = FOREACH grouped20 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors20 INTO done20 IF COUNT(neighbors) >= 5l, not_done20 IF COUNT(neighbors) < 5l;
prior_done20 = LOAD 'done20/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done20 = LOAD 'not_done20/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors20 = UNION prior_done20, prior_not_done20;
projected19 = FOREACH prior_neighbors20 GENERATE
SUBSTRING(quadkey, 0, 19) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped19 = FOREACH (GROUP projected19 BY quadkey) GENERATE group AS quadkey, projected19.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors19 = FOREACH grouped19 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors19 INTO done19 IF COUNT(neighbors) >= 5l, not_done19 IF COUNT(neighbors) < 5l;
prior_done19 = LOAD 'done19/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done19 = LOAD 'not_done19/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors19 = UNION prior_done19, prior_not_done19;
projected18 = FOREACH prior_neighbors19 GENERATE
SUBSTRING(quadkey, 0, 18) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped18 = FOREACH (GROUP projected18 BY quadkey) GENERATE group AS quadkey, projected18.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors18 = FOREACH grouped18 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors18 INTO done18 IF COUNT(neighbors) >= 5l, not_done18 IF COUNT(neighbors) < 5l;
prior_done18 = LOAD 'done18/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done18 = LOAD 'not_done18/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors18 = UNION prior_done18, prior_not_done18;
projected17 = FOREACH prior_neighbors18 GENERATE
SUBSTRING(quadkey, 0, 17) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped17 = FOREACH (GROUP projected17 BY quadkey) GENERATE group AS quadkey, projected17.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors17 = FOREACH grouped17 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors17 INTO done17 IF COUNT(neighbors) >= 5l, not_done17 IF COUNT(neighbors) < 5l;
prior_done17 = LOAD 'done17/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done17 = LOAD 'not_done17/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors17 = UNION prior_done17, prior_not_done17;
projected16 = FOREACH prior_neighbors17 GENERATE
SUBSTRING(quadkey, 0, 16) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped16 = FOREACH (GROUP projected16 BY quadkey) GENERATE group AS quadkey, projected16.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors16 = FOREACH grouped16 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors16 INTO done16 IF COUNT(neighbors) >= 5l, not_done16 IF COUNT(neighbors) < 5l;
prior_done16 = LOAD 'done16/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done16 = LOAD 'not_done16/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors16 = UNION prior_done16, prior_not_done16;
projected15 = FOREACH prior_neighbors16 GENERATE
SUBSTRING(quadkey, 0, 15) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped15 = FOREACH (GROUP projected15 BY quadkey) GENERATE group AS quadkey, projected15.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors15 = FOREACH grouped15 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors15 INTO done15 IF COUNT(neighbors) >= 5l, not_done15 IF COUNT(neighbors) < 5l;
prior_done15 = LOAD 'done15/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done15 = LOAD 'not_done15/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors15 = UNION prior_done15, prior_not_done15;
projected14 = FOREACH prior_neighbors15 GENERATE
SUBSTRING(quadkey, 0, 14) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped14 = FOREACH (GROUP projected14 BY quadkey) GENERATE group AS quadkey, projected14.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors14 = FOREACH grouped14 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors14 INTO done14 IF COUNT(neighbors) >= 5l, not_done14 IF COUNT(neighbors) < 5l;
prior_done14 = LOAD 'done14/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done14 = LOAD 'not_done14/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors14 = UNION prior_done14, prior_not_done14;
projected13 = FOREACH prior_neighbors14 GENERATE
SUBSTRING(quadkey, 0, 13) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped13 = FOREACH (GROUP projected13 BY quadkey) GENERATE group AS quadkey, projected13.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors13 = FOREACH grouped13 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors13 INTO done13 IF COUNT(neighbors) >= 5l, not_done13 IF COUNT(neighbors) < 5l;
prior_done13 = LOAD 'done13/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done13 = LOAD 'not_done13/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors13 = UNION prior_done13, prior_not_done13;
projected12 = FOREACH prior_neighbors13 GENERATE
SUBSTRING(quadkey, 0, 12) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped12 = FOREACH (GROUP projected12 BY quadkey) GENERATE group AS quadkey, projected12.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors12 = FOREACH grouped12 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors12 INTO done12 IF COUNT(neighbors) >= 5l, not_done12 IF COUNT(neighbors) < 5l;
prior_done12 = LOAD 'done12/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done12 = LOAD 'not_done12/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors12 = UNION prior_done12, prior_not_done12;
projected11 = FOREACH prior_neighbors12 GENERATE
SUBSTRING(quadkey, 0, 11) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped11 = FOREACH (GROUP projected11 BY quadkey) GENERATE group AS quadkey, projected11.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors11 = FOREACH grouped11 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors11 INTO done11 IF COUNT(neighbors) >= 5l, not_done11 IF COUNT(neighbors) < 5l;
prior_done11 = LOAD 'done11/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_not_done11 = LOAD 'not_done11/part*' AS (
quadkey: chararray,
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
prior_neighbors11 = UNION prior_done11, prior_not_done11;
projected10 = FOREACH prior_neighbors11 GENERATE
SUBSTRING(quadkey, 0, 10) AS quadkey,
geonameid AS geonameid,
longitude AS longitude,
latitude AS latitude,
neighbors AS neighbors;
grouped10 = FOREACH (GROUP projected10 BY quadkey) GENERATE group AS quadkey, projected10.(geonameid, longitude, latitude, $4) AS points_bag;
nearest_neighbors10 = FOREACH grouped10 GENERATE quadkey AS quadkey, FLATTEN(NearestNeighbors(5l, points_bag)) AS (
geonameid: int,
longitude: double,
latitude: double,
neighbors: bag {t:tuple(neighbor_id:int, distance:double)}
);
SPLIT nearest_neighbors10 INTO done10 IF COUNT(neighbors) >= 5l, not_done10 IF COUNT(neighbors) < 5l;


So you can see now why, with iteration, it's a good idea to generate as much code as possible.

And we're done :)

All the code for this is on github in the Sounder repo (along with tons of other Apache Pig examples).

Tuesday, May 3, 2011

Structural Similarity With Apache Pig

A while ago I posted this about computing the jaccard similarity (otherwise known as the structural similarity) of nodes in a network graph. Looking back on it over the past few days I realize there are some areas for serious improvement. Actually, it's just completely wrong. The approach is broken. So, I'm going to revisit the algorithm again, in more detail, and write a vastly improved Pig script for computing the structural similarity of vertices in a network graph.

The graph


Of course, before you can do a whole lot of anything, you've got to have a graph to work with. To keep things dead simple (and breaking from what I normally do) I'm going to draw an arbitrary graph (and by draw I mean draw). Here it is:

This can be represented as a tab-separated-values set of adjacency pairs in a file called 'graph.tsv':

$: cat graph.tsv
A C
B C
C D
C E
E F
C H
G H
C G
D E
A G
B G


I don't think it gets any simpler.

The Measure


Now, the idea here is to compute the similarity between nodes, eg similarity(A,B). There's already a well defined measure for this called the jaccard similarity. The key take away is to notice that:



The Pig


This can be broken into a set of Pig operations quite easily actually:

load data


Remember, I saved the data into a file called 'graph.tsv'. So:


edges = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);
edges_dup = LOAD 'graph.tsv' AS (v1:chararray, v2:chararray);


It is necessary, but still a hack, to load the data twice at the moment. This is because self-intersections (which I'll be doing in a moment) don't work with Pig 0.8 at the moment.

augment with sizes


Now I need the 'size' of each of the sets. In this case the 'sets' are the list of nodes each node links to. So, all I really have to do is calculate the number of outgoing links:


grouped_edges = GROUP edges BY v1;
aug_edges = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;

grouped_dups = GROUP edges_dup BY v1;
aug_dups = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;


Again, if self-intersections worked, the operation of counting the outgoing links would only have to happen once. Now I've got a handle on the |A| and |B| parts of the equation above.

intersection


Next I'm going to compute the intersection using a Pig join:


edges_joined = JOIN aug_edges BY v2, aug_dups BY v2;
intersection = FOREACH edges_joined {
--
-- results in:
-- (X, Y, |X| + |Y|)
--
added_size = aug_edges::v1_out + aug_dups::v1_out;
GENERATE
aug_edges::v1 AS v1,
aug_dups::v1 AS v2,
added_size AS added_size
;
};



Notice I'm adding the individual set sizes. This is to come up the |A| + |B| portion of the denominator in the jaccard index.

intersection sizes


In order to compute the size of the intersection I've got to use a Pig GROUP and collect all the elements that matched on the JOIN:


intersect_grp = GROUP intersection BY (v1, v2);
intersect_sizes = FOREACH intersect_grp {
--
-- results in:
-- (X, Y, |X /\ Y|, |X| + |Y|)
--
intersection_size = (double)COUNT(intersection);
GENERATE
FLATTEN(group) AS (v1, v2),
intersection_size AS intersection_size,
MAX(intersection.added_size) AS added_size -- hack, we only need this one time
;
};


There's a hack in there. The reason is that 'intersection.added_size' is a Pig data bag containing some number of identical tuples (equal to the intersection size). Each of these tuples is the 'added_size' from the previous step. Using MAX is just a convenient way of pulling out only one of them.

similarity


And finally, I have all the pieces in place to compute the similarities:


similarities = FOREACH intersect_sizes {
--
-- results in:
-- (X, Y, |X /\ Y|/|X U Y|)
--
similarity = (double)intersection_size/((double)added_size-(double)intersection_size);
GENERATE
v1 AS v1,
v2 AS v2,
similarity AS similarity
;
};


That'll do it. Here's the full script for completeness:


edges = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);
edges_dup = LOAD '$GRAPH' AS (v1:chararray, v2:chararray);

--
-- Augment the edges with the sizes of their outgoing adjacency lists. Note that
-- if a self join was possible we would only have to do this once.
--
grouped_edges = GROUP edges BY v1;
aug_edges = FOREACH grouped_edges GENERATE FLATTEN(edges) AS (v1, v2), COUNT(edges) AS v1_out;

grouped_dups = GROUP edges_dup BY v1;
aug_dups = FOREACH grouped_dups GENERATE FLATTEN(edges_dup) AS (v1, v2), COUNT(edges_dup) AS v1_out;

--
-- Compute the sizes of the intersections of outgoing adjacency lists
--
edges_joined = JOIN aug_edges BY v2, aug_dups BY v2;
intersection = FOREACH edges_joined {
--
-- results in:
-- (X, Y, |X| + |Y|)
--
added_size = aug_edges::v1_out + aug_dups::v1_out;
GENERATE
aug_edges::v1 AS v1,
aug_dups::v1 AS v2,
added_size AS added_size
;
};

intersect_grp = GROUP intersection BY (v1, v2);
intersect_sizes = FOREACH intersect_grp {
--
-- results in:
-- (X, Y, |X /\ Y|, |X| + |Y|)
--
intersection_size = (double)COUNT(intersection);
GENERATE
FLATTEN(group) AS (v1, v2),
intersection_size AS intersection_size,
MAX(intersection.added_size) AS added_size -- hack, we only need this one time
;
};

similarities = FOREACH intersect_sizes {
--
-- results in:
-- (X, Y, |X /\ Y|/|X U Y|)
--
similarity = (double)intersection_size/((double)added_size-(double)intersection_size);
GENERATE
v1 AS v1,
v2 AS v2,
similarity AS similarity
;
};

DUMP similarities;


Results



Run it:


$: pig -x local jaccard.pig
...snip...
(A,A,1.0)
(A,B,1.0)
(A,C,0.2)
(B,A,1.0)
(B,B,1.0)
(B,C,0.2)
(C,A,0.2)
(C,B,0.2)
(C,C,1.0)
(C,D,0.25)
(C,G,0.25)
(D,C,0.25)
(D,D,1.0)
(E,E,1.0)
(G,C,0.25)
(G,G,1.0)


And it's done! Hurray. Now, go make a recommender system or something.

Tuesday, April 26, 2011

A Lucene Text Tokenization UDF for Apache Pig

As much as I loathe to admit it, sometimes java is called for. One of those times is tokenizing raw text. You'll notice in the post about tfidf how I used a Wukong script, written in ruby, to accomplish the task of tokenizing a large text corpus with Hadoop and Pig. There are a couple of problems with this:

1. Ruby is slow at this.

2. All the gem dependencies (wukong itself, extlib, etc) must exist on all the machines in the cluster and be available in the RUBYLIB (yet another environment variable to manage).

There is a better way.

A Pig UDF



Pig UDFs (User Defined Functions) come in a variety of flavors. The simplest type is the EvalFunc whose function 'exec()' essentially acts as the Wukong 'process()' method or the java hadoop Mapper's 'map()' function. Here we're going to write an EvalFunc that takes a raw text string as input and outputs a pig DataBag. Each Tuple in the DataBag will be a single token. Here's what it looks like as a whole:



import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;

import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.util.Version;
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.standard.StandardTokenizer;

public class TokenizeText extends EvalFunc {

private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();
private static String NOFIELD = "";
private static StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_31);

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 1 || input.isNull(0))
return null;

// Output bag
DataBag bagOfTokens = bagFactory.newDefaultBag();

StringReader textInput = new StringReader(input.get(0).toString());
TokenStream stream = analyzer.tokenStream(NOFIELD, textInput);
CharTermAttribute termAttribute = stream.getAttribute(CharTermAttribute.class);

while (stream.incrementToken()) {
Tuple termText = tupleFactory.newTuple(termAttribute.toString());
bagOfTokens.add(termText);
termAttribute.setEmpty();
}
return bagOfTokens;
}
}


There's absolutely nothing special going on here. Remember, the 'exec' function gets called on every Pig Tuple of input. bagOfTokens will be the Pig DataBag returned. First, the lucene library tokenizes the input string. Then all the tokens in the resulting stream are turned into Pig Tuples and added to the result DataBag. Finally the resulting DataBag is returned. A document is truly a bag of words.

Example Pig Script



And here's an example script to use that UDF:


documents = LOAD 'documents' AS (doc_id:chararray, text:chararray);
tokenized = FOREACH documents GENERATE doc_id AS doc_id, FLATTEN(TokenizeText(text)) AS (token:chararray);



And that's it. It's blazing fast text tokenization for Apache Pig.

Hurray.

Saturday, April 23, 2011

TF-IDF With Apache Pig

Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).

Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.

One of the cool things about tf-idf is how damn simple it is. Take a look at the wikipedia page. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (>= 0.8) installed, and the Wukong rubygem.

Get Data


What would an example be without some data? Here's a link to the canonical 20 newsgroups data set that I've restructured slightly for your analysis pleasure.

Tokenization


Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find here. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.

Pig, Step By Step



The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:

DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');

This statement can be interpreted to mean:

"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."

Next up is to load and tokenize the data:


raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);


So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs.

Then, we need to count the number of times each unique (doc_id, token) pair appears:


doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;


And, since we're after the token frequencies and not just the counts, we need to attach the document sizes:


doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
;


Then the term frequencies are just:


term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
;



For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term:


term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token
;



Finally, we can compute the tf-idf weight:


tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
GENERATE
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf
;
};


Where NDOCS is the total number of documents in the corpus.

Pig, All Together Now



And here's the final script, all put together:


DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');

raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);

doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;

doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
;

term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
;

term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token
;

tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
GENERATE
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf
;
};

STORE tfidf_all INTO '$OUT';



Run It


Assuming your data is on the hdfs at "/data/corpus/20_newsgroups/data", and you've named the pig script "tfidf.pig", then you can run with the following:


pig -p DOCS=/data/corpus/20_newsgroups/data -p NDOCS=18828 -p OUT=/data/corpus/20_newsgroups/tfidf tfidf.pig


Here's what the output of that looks like:


sci.crypt.15405 student 0.0187808247467920464
sci.crypt.16005 student 0.0541184292045718135
sci.med.58809 student 0.0839387881540297476
sci.med.59021 student 0.0027903667703849783
sci.space.60850 student 0.0377339506380500803
sci.crypt.15178 student 0.0010815147566519744
soc.religion.christian.21414 student 0.0587571517078208302
comp.sys.ibm.pc.hardware.60725 student 0.0685500103257909721
soc.religion.christian.21485 student 0.0232372916358613464
soc.religion.christian.21556 student 0.0790961657605280533


Hurray.

Next time let's look at how to take that output and recover the topics using clustering!