Tuesday, April 26, 2011

A Lucene Text Tokenization UDF for Apache Pig

As much as I loathe to admit it, sometimes java is called for. One of those times is tokenizing raw text. You'll notice in the post about tfidf how I used a Wukong script, written in ruby, to accomplish the task of tokenizing a large text corpus with Hadoop and Pig. There are a couple of problems with this:

1. Ruby is slow at this.

2. All the gem dependencies (wukong itself, extlib, etc) must exist on all the machines in the cluster and be available in the RUBYLIB (yet another environment variable to manage).

There is a better way.

A Pig UDF

Pig UDFs (User Defined Functions) come in a variety of flavors. The simplest type is the EvalFunc whose function 'exec()' essentially acts as the Wukong 'process()' method or the java hadoop Mapper's 'map()' function. Here we're going to write an EvalFunc that takes a raw text string as input and outputs a pig DataBag. Each Tuple in the DataBag will be a single token. Here's what it looks like as a whole:

import java.io.IOException;import java.io.StringReader;import java.util.Iterator;import org.apache.pig.EvalFunc;import org.apache.pig.data.Tuple;import org.apache.pig.data.TupleFactory;import org.apache.pig.data.DataBag;import org.apache.pig.data.BagFactory;import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;import org.apache.lucene.util.Version;import org.apache.lucene.analysis.Token;import org.apache.lucene.analysis.TokenStream;import org.apache.lucene.analysis.standard.StandardAnalyzer;import org.apache.lucene.analysis.standard.StandardTokenizer;public class TokenizeText extends EvalFunc {    private static TupleFactory tupleFactory = TupleFactory.getInstance();    private static BagFactory bagFactory = BagFactory.getInstance();    private static String NOFIELD = "";    private static StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_31);    public DataBag exec(Tuple input) throws IOException {        if (input == null || input.size() < 1 || input.isNull(0))            return null;        // Output bag        DataBag bagOfTokens = bagFactory.newDefaultBag();                        StringReader textInput = new StringReader(input.get(0).toString());        TokenStream stream = analyzer.tokenStream(NOFIELD, textInput);        CharTermAttribute termAttribute = stream.getAttribute(CharTermAttribute.class);        while (stream.incrementToken()) {            Tuple termText = tupleFactory.newTuple(termAttribute.toString());            bagOfTokens.add(termText);            termAttribute.setEmpty();        }        return bagOfTokens;    }}

There's absolutely nothing special going on here. Remember, the 'exec' function gets called on every Pig Tuple of input. bagOfTokens will be the Pig DataBag returned. First, the lucene library tokenizes the input string. Then all the tokens in the resulting stream are turned into Pig Tuples and added to the result DataBag. Finally the resulting DataBag is returned. A document is truly a bag of words.

Example Pig Script

And here's an example script to use that UDF:

documents    = LOAD 'documents' AS (doc_id:chararray, text:chararray);tokenized    = FOREACH documents GENERATE doc_id AS doc_id, FLATTEN(TokenizeText(text)) AS (token:chararray);

And that's it. It's blazing fast text tokenization for Apache Pig.

Hurray.

Saturday, April 23, 2011

TF-IDF With Apache Pig

Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).

Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.

One of the cool things about tf-idf is how damn simple it is. Take a look at the wikipedia page. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (>= 0.8) installed, and the Wukong rubygem.

Get Data

What would an example be without some data? Here's a link to the canonical 20 newsgroups data set that I've restructured slightly for your analysis pleasure.

Tokenization

Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find here. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.

Pig, Step By Step

The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:
DEFINE tokenize_docs ruby tokenize_documents.rb --id_field=0 --text_field=1 --map SHIP('tokenize_documents.rb');

This statement can be interpreted to mean:

"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."

Next up is to load and tokenize the data:

raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray); So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs. Then, we need to count the number of times each unique (doc_id, token) pair appears: doc_tokens = GROUP tokenized BY (doc_id, token);doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages; And, since we're after the token frequencies and not just the counts, we need to attach the document sizes: doc_usage_bag = GROUP doc_token_counts BY doc_id;doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE group AS doc_id, FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages), SUM(doc_token_counts.num_doc_tok_usages) AS doc_size ; Then the term frequencies are just: term_freqs = FOREACH doc_usage_bag_fg GENERATE doc_id AS doc_id, token AS token, ((double)num_doc_tok_usages / (double)doc_size) AS term_freq; ; For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term: term_usage_bag = GROUP term_freqs BY token;token_usages = FOREACH term_usage_bag GENERATE FLATTEN(term_freqs) AS (doc_id, token, term_freq), COUNT(term_freqs) AS num_docs_with_token ; Finally, we can compute the tf-idf weight: tfidf_all = FOREACH token_usages { idf = LOG((double)$NDOCS/(double)num_docs_with_token);              tf_idf = (double)term_freq*idf;                GENERATE                  doc_id AS doc_id,                  token  AS token,                  tf_idf AS tf_idf                ;             };

Where NDOCS is the total number of documents in the corpus.

Pig, All Together Now

And here's the final script, all put together:

DEFINE tokenize_docs ruby tokenize_documents.rb --id_field=0 --text_field=1 --map SHIP('tokenize_documents.rb');raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);doc_tokens = GROUP tokenized BY (doc_id, token);doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;doc_usage_bag = GROUP doc_token_counts BY doc_id;doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE group AS doc_id, FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages), SUM(doc_token_counts.num_doc_tok_usages) AS doc_size ;term_freqs = FOREACH doc_usage_bag_fg GENERATE doc_id AS doc_id, token AS token, ((double)num_doc_tok_usages / (double)doc_size) AS term_freq; ; term_usage_bag = GROUP term_freqs BY token;token_usages = FOREACH term_usage_bag GENERATE FLATTEN(term_freqs) AS (doc_id, token, term_freq), COUNT(term_freqs) AS num_docs_with_token ;tfidf_all = FOREACH token_usages { idf = LOG((double)$NDOCS/(double)num_docs_with_token);              tf_idf = (double)term_freq*idf;                GENERATE                  doc_id AS doc_id,                  token  AS token,                  tf_idf AS tf_idf                ;             };STORE tfidf_all INTO '\$OUT';

Run It

Assuming your data is on the hdfs at "/data/corpus/20_newsgroups/data", and you've named the pig script "tfidf.pig", then you can run with the following:

pig -p DOCS=/data/corpus/20_newsgroups/data -p NDOCS=18828 -p OUT=/data/corpus/20_newsgroups/tfidf tfidf.pig

Here's what the output of that looks like:

sci.crypt.15405                student  0.0187808247467920464sci.crypt.16005                student  0.0541184292045718135sci.med.58809                  student  0.0839387881540297476sci.med.59021                  student  0.0027903667703849783sci.space.60850                student  0.0377339506380500803sci.crypt.15178                student  0.0010815147566519744soc.religion.christian.21414   student  0.0587571517078208302comp.sys.ibm.pc.hardware.60725 student  0.0685500103257909721soc.religion.christian.21485   student  0.0232372916358613464soc.religion.christian.21556   student  0.0790961657605280533

Hurray.

Next time let's look at how to take that output and recover the topics using clustering!