Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).
Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.
One of the cool things about tf-idf is how damn
simple it is. Take a look at the wikipedia
page. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (>= 0.8) installed, and the Wukong rubygem.
Get Data
What would an example be without some data? Here's a link to the canonical
20 newsgroups data set that I've restructured slightly for your analysis pleasure.
Tokenization
Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find
here. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.
Pig, Step By Step
The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:
DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');
This statement can be interpreted to mean:
"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."
Next up is to load and tokenize the data:
raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);
So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs.
Then, we need to count the number of times each unique (doc_id, token) pair appears:
doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;
And, since we're after the token frequencies and not just the counts, we need to attach the document sizes:
doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
;
Then the term frequencies are just:
term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
;
For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term:
term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token
;
Finally, we can compute the tf-idf weight:
tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
GENERATE
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf
;
};
Where NDOCS is the total number of documents in the corpus.
Pig, All Together Now
And here's the final script, all put together:
DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');
raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);
doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;
doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
;
term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
;
term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token
;
tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
GENERATE
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf
;
};
STORE tfidf_all INTO '$OUT';
Run It
Assuming your data is on the hdfs at "/data/corpus/20_newsgroups/data", and you've named the pig script "tfidf.pig", then you can run with the following:
pig -p DOCS=/data/corpus/20_newsgroups/data -p NDOCS=18828 -p OUT=/data/corpus/20_newsgroups/tfidf tfidf.pig
Here's what the output of that looks like:
sci.crypt.15405 student 0.0187808247467920464
sci.crypt.16005 student 0.0541184292045718135
sci.med.58809 student 0.0839387881540297476
sci.med.59021 student 0.0027903667703849783
sci.space.60850 student 0.0377339506380500803
sci.crypt.15178 student 0.0010815147566519744
soc.religion.christian.21414 student 0.0587571517078208302
comp.sys.ibm.pc.hardware.60725 student 0.0685500103257909721
soc.religion.christian.21485 student 0.0232372916358613464
soc.religion.christian.21556 student 0.0790961657605280533
Hurray.
Next time let's look at how to take that output and recover the topics using clustering!