Verbose TFxIDF (Weighting) Example with Dumbo, The Begining

Recently, i ventured into the world of information retrieval and data mining because its cool to learn something new and it is the future of the “InterWebbs”. Over the few weeks, i have buried my head into research papers, books, source code with my trusty Netbook as my side kick. One of the concepts i have picked up is the infamous TFxIDF. The super smart weighting algorithm that everyone seems to rant about. Its a nifty algorithm which adequately weighs a term in a text according to its relevance. I will not go on about what exaclty it is because google does a good job of explaining things.

I also came across this method called MapReduce developed by my hero’s at Google. As a fan of structured programming and methodologies, i like adopting tested well structured ways of solving problems. Map Reduce helps you break large problems into simple tasks which can then be split between clusters. Hadoop seems to be the best framework for executing MapReduce jobs.

Ok, so i know what TFxIDF and Hadoop are used for, how can i implement this in my own project written in python [Enter Dumbo]. In lame man’s terms, Dumbo translates python map reduce code to a hadoop cluster.

Great, lets start coding.  I went through the  Dumbo TFxIDF Example and also the short tutorial and also the IRBook (A great introduction information retrieval book). I could not really follow the example because it seemed like the Dumbo creator writes he’s example code for Experts and not Dumbies (Pun? Intended?). There is mapperA, mapperB, reducerC, reducerD etc. So i give a more “Verbose” example. It also calculates Euclidean Length (To be used in calculating euclidean distance between two points, a query string and a text/docuement for example) for each document/text.
Over the comming weeks, i will explain parts of the code and also clean it up a little.

from dumbo import *
from math import pow, log, sqrt


@opt(“addpath”, “yes”)
class TokenCountMapper:
    def call(self, doc, line):
        “””
        Should generate a tokenezed list which may have repeated words.
        Tokens would be grouped and counted in the reducer
        “””
        tokens = tokenize(line.lower()) 
        for token in tokens:
            yield (doc[1], token), 1 


class TokenCountReducer:
    #This is skipped and the Dumbo sumreducer helper is used instead
    pass
class DocumentTokenCountMapper:
    “””
    I receive the total count of a token in each document 
    “””
    def call(self, key, tokenCount):
        doc, token = key
        yield doc, (token, tokenCount)


class DocumentTokenCountReducer:
    “””
    Sum the amount of tokens in a doc
    “””
    def call(self, doc, value):
        values = list(value)
        #total number of tokens in doc n
        totalNumberOfTokens = sum(tokenCount for token, tokenCount in values)  
        #yield token info for current doc
        for token, tokenCount in values:
            yield (token, doc), (tokenCount, totalNumberOfTokens)


class TokenCountDocumentCountMapper:
    def call(self, key, value):
        token, doc = key
        tokenCount, totalNumberOfTokens = value
        #this token has this info and is in this document 
        yield token, (doc, tokenCount, totalNumberOfTokens, 1)


class TokenCountDocumentCountReducer:
    def call(self, token, value):
        values = list(value)
        #count the number of docs for this token
        df = sum(docCount for doc, tokenCount, docTokCount, docCount in values)
        for doc, tokenCount, docTokCount in (value[:3] for value in values):
            yield (doc, token), (tokenCount, docTokCount, df, float(tokenCount)/docTokCount)


class EucledianLengthMapper:
    def call(self, key, value): 
        doc, token = key
        tokenCount, docTokCount, df, tf = value
        yield token, (doc, tokenCount, docTokCount, df, tf)
        
class EucledianLengthReducer:
    def call(self, token, value): 
        values = list(value)
        for doc, tokenCount, docTokCount, df, tf in values:
            yield (doc, token), (tokenCount, docTokCount, df, tf, pow(float(tokenCount), 2))  


class EucledianLengthSummerMapper:
    def call(self, key, value):
        doc, token = key
        tokenCount, docTokCount, df, tf, poww = value
        yield doc, (token, tokenCount, docTokCount, df, tf, poww)


class EucledianLengthSummerReducer:
    def call(self, doc, value):
        values = list(value)
        totalDistances = sum(v[5] for v in values)
        for token, tokenCount, docTokCount, df, tf, poww in (v for v in values):
            yield (doc, token), (docTokCount, df, tf, poww, sqrt(totalDistances))


if name == “main“:
    import dumbo
    job = dumbo.Job()
    job.additer(TokenCountMapper, sumreducer, sumreducer)
    job.additer(DocumentTokenCountMapper, DocumentTokenCountReducer)
    job.additer(TokenCountDocumentCountMapper, TokenCountDocumentCountReducer)
    job.additer(EucledianLengthMapper, EucledianLengthReducer)
    job.additer(EucledianLengthSummerMapper, EucledianLengthSummerReducer)
    job.run()