Using Hadoop with Python (and not Spark)

We can use Hadoop to count the number of words per sentence-tokenized document in a sizable data set such as the MASC corpus.

Download the “MASC Sentence Corpus” to /common/clusterdata/<username> and prepare the data for “Hadooping” with:

  1. cut -f 7 masc_sentences.tsv > masc_sentences.lst
  2. hdfs dfs -put masc_sentences.lst /user/<username>

The “Mapper” Script

This first script will serve as the “map” portion of our Hadoop MapReduce job. It simply tokenizes each sentence provided as input–one sentence per line–and returns the number of tokens per sentence.

import sys
import re

from nltk.tokenize import TreebankWordTokenizer


tokenizer = TreebankWordTokenizer()
sentences = sys.stdin

for (sentence_no, sentence) in enumerate(sentences):
    tokenized = [t for t in tokenizer.tokenize(sentence) 
                 if re.search(r'[a-zA-Z0-9]+', t)]
    print(sentence_no, len(tokenized))

The “Reducer” Script

This second script is the “reduce” portion of our Hadoop MapReduce job. It takes the output from the “mapper” script as input and returns the number of words across all sentences provided as input to the “mapper”.

import sys


total = 0

for line in sys.stdin:
    (sentence_no, word_count) = line.strip().split(" ")
    total += int(word_count)

print(total)

How to Run

If you have a copy of masc_sentences.lst saved locally, you can test your mapper and reducer scripts with:

python mapper.py < masc_sentences.lst | python reducer.py

When you’re ready to utilize the cluster, you can do:

hadoop jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.3.0-235.jar \
-files mapper.py,reducer.py \
-cmdenv PATH=$PATH \
-mapper "python3 mapper.py" \
-reducer "python3 reducer.py" \
-input /user/<username>/masc_sentences.lst \
-output /user/<username>/output

Understanding the Hadoop Command

hadoop is a program that submits our MapReduce jobs to our cluster via the YARN scheduler. The program yarn can also be used, with all other arguments remaining the same.

Every Hadoop job is a pair of programs: a “mapper” program and a “reducer” program. Hadoop requires that they both read from stdin, so while the mapper must write its output to stdout so the reducer can read it, the reducer can write its final output anywhere (stdout, local files, HDFS, databases, etc.).

Node that:

  • The order of the first three arguments after hadoop are fixed; that is, this command must always take the form of jar <path to jar> -files.
  • The -output location must not exist prior to running this command, otherwise the job will fail.