Using Hadoop with Python (and not Spark) Again

This tutorial is based off of https://medium.com/@rrfd/your-first-map-reduce-using-hadoop-with-python-and-osx-ca3b6f3dfe78, whose example involves tracking the exchange rate of most of the world’s currencies against the US dollar for an extended period of time. Unfortunately, the original author’s code is (a) improperly-formatted, (b) doesn’t work with Python 3.x, and (c) the provided reducer.py script is the same as the provided mapper.py script. So, it is recommended you use the scripts provided here.

Download the data to /common/clusterdata/<username> using wget: wget https://raw.githubusercontent.com/datasets/exchange-rates/master/data/daily.csv. Then, store daily.csv in hdfs with hdfs dfs -put daily.csv /user/<username>. Keep the CSV on the local filesystem for testing purposes.

Next, here is the code for mapper.py, also to be stored in /common/clusterdata/<username>:

import sys


currentCountry = None
previousCountry = None
currentFx = None
previousFx = None
percentChange = None
currentKey = None
fxMap = []

infile = sys.stdin

next(infile) # skip first line of input file

for line in infile:
    line = line.strip().split(',', 2)

    try:
        currentCountry = line[1].rstrip()
        if len(line[2]) == 0:
            continue
        currentFx = float(line[2])

        if currentCountry != previousCountry:
            previousCountry = currentCountry
            previousFx = currentFx
            previousLine = line
            continue

        # If country same as previous, add to map
        elif currentCountry == previousCountry:
            percentChange = ((currentFx - previousFx) / previousFx) * 100.00
            percentChange = round(percentChange, 2)
            percentChange = percentChange

        currentKey = (currentCountry, percentChange)

        # Set the array with tuple keys
        fxMap.append(currentKey)

        # Update Values
        previousCountry = currentCountry
        previousFx = currentFx
        previousLine = line

    except Exception as e:
        template = "An exception of type {0} occurred. Arguments:\n{1!r}"
        message = template.format(type(e).__name__, e.args)
        print("currentFx: %.2f previousFx: %.2f" % (currentFx, previousFx))
        print(message)
        sys.exit(1)

# Show the returned values
for i in sorted(fxMap):
    print(",".join([str(x) for x in i]))

reducer.py is much shorter:

import sys
from collections import defaultdict


counts = defaultdict(int)

for line in sys.stdin:
    (country, percent_change) = line.strip().split(",")
    counts[(country, percent_change)] += 1

for key in sorted(counts):
    print("{} @ {}% = {}".format(key[0], key[1], counts[key]))

You can test this locally with python mapper.py < daily.csv and then together with python mapper.py < daily.csv | python reducer.py to make sure they work.

Once satisfied, we’ll send it to the cluster:

hadoop jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-streaming.jar \
-files mapper.py,reducer.py -cmdenv PATH=$PATH \
-mapper "python mapper.py" -reducer "python reducer.py" \
-input /user/<username>/daily.csv \
-output /user/<username>/output/

Note: * -files must be the first argument. * output cannot already exist, otherwise your job will fail. (hdfs dfs -rm -R)

If the command is successful, hdfs dfs -ls /user/<username>/output should now show two files: _SUCCESS and part-00000. Your results are in part-00000, e.g. hdfs dfs -cat /user/<username>/python_example_1/output/part-00000 | head:

Australia @ -0.01% = 116
Australia @ -0.02% = 93
Australia @ -0.03% = 160
Australia @ -0.04% = 190
Australia @ -0.05% = 108
Australia @ -0.06% = 104
Australia @ -0.07% = 129
Australia @ -0.08% = 131
Australia @ -0.09% = 127
Australia @ -0.1% = 85