Navigating “Big Data” with Spark and Hive

In this tutorial, you’ll be performing data manipulation and querying using both Spark and Hive.

Getting Started

If you’re on campus and on wifi, make sure you’re not on the CS-Research network. Downloading data of this size while on this wifi network violates the network’s policy and you’ll be asked to explain yourself 🙂

Download the taxi cab data linked to above to your local machine, as the CS servers currently lack a utility that can unzip 7zip files. Uncompress each of the files, then recompress them with bzip2 before using rsync to upload them to /common/clusterdata/<username>. Then uncompress them again, and load the CSVs into HDFS, say hdfs dfs -put *.csv /user/<username>/nyc_taxi_2013_data.

Understanding the Data Set

In the city, in order to be an official yellow cab, the car must have a medallion. Medallion IDs can be found in the first column of our data. However, in order for one to drive a taxi, medallioned or otherwise (e.g. the green “boro” cabs are not medallioned), one must have a license, known as a “hack license”. Column 2 of our data reflects the taxi driver’s hack license ID. A single taxi may be driven by multiple drivers over the course of its lifetime; for example, the taxi that has been driven the most frequently in this entire data set–20BA941F62CC07F1FA3EF3E122B1E9B2 which was driven 22,534 times–was driven by 122 different drivers.

Note

Unfortunately, the driver license IDs were improperly anonymized, so it will look as though one driver made an absurd amount of money in 2013. Nonetheless, we can use this data set to draw some interesting conclusions and to illustrate some Spark and Hive concepts.

Using Spark to Explore the Data

The script below will produce the most-frequent combinations of taxi and driver, or (medallion, hack_license). The input to this script should be hdfs:///user/<username>/nyc_taxi_2013_data/*.csv and remember to run it with spark-submit --master yarn:

import csv
import sys

from pyspark import SparkContext


def split_line(line):
    # this is a one-row "csv" file
    # we do this so we read in each row properly since it comes from a CSV
    # we're interested in the first two columns, 
    # the taxi's "medallion" and the driver's "hack license"
    # yes, the file headers are counted, 
    # but they only occur 12 times so they're washed out
    for row in csv.reader([line]):
        return ((row[0], row[1]), 1)


def main(input_file):
    sc = SparkContext(appName="FindBusiestNYC2013Taxi")
    data = sc.textFile(input_file)
    medallion_license_counts = (data.map(split_line)
                                .reduceByKey(lambda a, b : a + b)
                                .sortBy(lambda x: x[1], ascending=False))

    for (pair, count) in medallion_license_counts.take(5):
        print(pair, count)

    sc.stop()


if __name__ == "__main__":
    main(sys.argv[1])

The result will be the five busiest taxis by medallion ID and hack license, printed on stdout:

('E2D92DC49D2E3A841FFCE27ADA3921F0', '51C1BE97280A80EBFA8DAD34E1956CF6') 17603
('552CCF061B871F7179CC0357086CB42C', 'D85749E8852FCC66A990E40605607B2F') 16467
('24F08861B2158ACD82BE9AD23EE3B06E', '3D757E111C78F5CAC83D44A92885D490') 16139
('7C656D41E55C63899BFEED59030159B3', '23DF80C977D15141F11DD713C523C311') 16073
('3C6281F8AA19CBF6FD30411FE2CA8F9F', '3AAB94CA53FE93A64811F65690654649') 15363

While this is running, check the ApplicationMaster to see how Spark has divided this data into MapReduce tasks.

Moving to Hive

We can extract lots of information like this from this data set quicker and easier if we move away from reading from CSV files and query the data directly. For that, we’re going to use Apache Hive.

Disdvantages of Hive

The advantage of using Hive over a traditional DBMS is that the queries are run using MapReduce and thus can sometimes be faster than an untuned traditional DBMS for large data sets out. However, Hive lacks many features of traditional DBMSs, specifically anything to do with data integrity (primary keys, auto-incrementing fields, foreign keys, constraints, triggers, automatically-populated fields) and its tables are intended to be written to once and never (or rarely) updated.

Another disadvantage of using Hive is that its query language is “SQL-inspired”; it does not adhere to the official SQL specification, but rather, it is just close enough to real SQL to be confusing to folks who are comfortable with SQL.

Nonetheless, Hive is super useful for keeping our large data sets structured, organized, and quickly-accessible.

Data Preparation

Currently, this data set lacks a way to uniquely identify an individual trip–again, this is where a primary key would come in handy. We can cut down on some of the data duplication by splitting these two fields into a taxis table and keeping the rest in a trips table. In the end, what we’ll have looks something like this:

taxis
-----
taxi_id,medallion,hack_license
1,E2D92DC49D2E3A841FFCE27ADA3921F0,51C1BE97280A80EBFA8DAD34E1956CF6
.
.
.
trips
-----
trip_id,taxi_id,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
1,1,VTS,1,,2013-01-13 11:06:00,2013-01-13 11:21:00,6,900,3.44,-73.958046,40.769424,-73.998337,40.75008
2,1,VTS,1,,2013-01-13 10:36:00,2013-01-13 10:44:00,6,480,2.13,-73.985039,40.727901,-73.99176,40.749218
.
.
.

With a traditional DBMS, the foreign key relationship between taxis and trips would be automatically supplied and maintained by the DBMS, but with Hive, it is up to us to impose and enforce this relationship. This is a key reason why Hive tables are intented to be written to once and never modified.

  1. Download the scripts create_taxis_reference_csv.py and create_trip_table_csvs.py to your /common/clusterdata/<username> directory.
  2. Start a screen session since, these scripts can take a while to run and you might want to log out before they’ve finished.
    1. You can start a screen session with screen -U -S <name of screen session> and, once you’re there, you can exit from it without killing the session with Ctrl-A d.
    2. You can return to the screen with screen -r <name of screen session> and when you’re done, simply type exit to kill the screen.
  3. Run spark-submit --master yarn create_taxis_reference_csv.py hdfs:///user/<username>/nyc_taxi_2013_data/trip_data*.csv which should create a 43mb file on your disc called taxis_table_data.csv. There should be 610,132 unique pairs of medallions and hack licenses. Don’t worry about the lack of headers on this or any other file we’ll create.
  4. Run spark-submit --master yarn create_trip_table_csvs.py hdfs:///user/<username>/nyc_taxi_2013_data/trip_data*.csv. Once it’s done, check hdfs:///user/<username>/nyc_taxi_2013_data/output/ where you should see this data spread across multiple part- files.
  5. Then run hdfs dfs -cat /user/<username>/nyc_taxi_2013_data/output/part-* | nl -s ',' | hdfs dfs -put - /user/<username>/nyc_taxi_2013_data/trips_table_data.csv. You can close the screen once this has finished.
  6. Do hdfs dfs -get /user/<username>/nyc_taxi_2013_data/trips_table_data.csv to download the giant file we just created.
  7. Clean up this file a bit:
    1. perl -p -e 's/^\s*//' trips_table_data.csv > temp
    2. mv temp trips_table_data.csv.
  8. Make sure your /common/clusterdata directory is world-readable with chmod 755 /common/clusterdata/<username>.

Loading the Prepared Data into Hive Tables

We’ll use the beeline client to create Hive tables and run some example queries. On our servers, you can start beeline with:

beeline -u "jdbc:hive2://data-services2.cs.rutgers.edu:2181,data-services3.cs.rutgers.edu:2181,data1.cs.rutgers.edu:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"

See this page for more information regarding our installation of Hive.

If you don’t already have a Hive database, you can create one with create database if not exists <name>;, where <name> is recommended to be your username, unless you have a specific purpose in mind for this database. There’s no limit to the number of databases you can create. If you don’t create a database, your tables will be created in the default database and accessible by other users.

You can switch to any database with use <name>; and get a list of all of the databases with show databases;.

We’ll now begin to load the tables, starting with the taxis table since the trips table references it. Run the following Hive SQL command to create the table:

create table if not exists taxis 
(taxi_id BIGINT, medallion VARCHAR(35), hack_license VARCHAR(35)) 
row format delimited fields terminated by ',';

Then populate the table with

load data local inpath 
'/common/clusterdata/<username>/nyc_taxi_2013/taxis_table_data.csv' 
into table taxis;`

To verify that the load succeeded, you can take a look at the first 10 rows of the table with select * from taxis limit 10;.

Next, we’ll load the trips table:

create table if not exists trips 
(trip_id BIGINT, 
taxi_id BIGINT, 
vendor_id CHAR(3), 
rate_code INT, 
store_and_fwd_flag VARCHAR(1), 
pickup_datetime TIMESTAMP, 
dropoff_datetime TIMESTAMP, 
passenger_count INT, 
trip_time_in_secs BIGINT, 
trip_distance FLOAT, 
pickup_longitude DOUBLE, 
pickup_latitude DOUBLE, 
dropoff_longitude DOUBLE, 
dropoff_latitude DOUBLE) 
row format delimited fields terminated by ',';

Then,

load data local inpath 
'/common/clusterdata/<username>/nyc_taxi_2013/trips_table_data.csv' 
into table trips;

Example Queries

Note that Hive queries don’t seem to allow count(<table name>.*).

To see the top five busiest taxis, like we did earlier using our script and the CSV files:

select taxis.taxi_id, taxis.medallion, taxis.hack_license, 
count(trips.trip_id) as count 
from taxis, trips where trips.taxi_id = taxis.taxi_id 
group by taxis.taxi_id, taxis.medallion, taxis.hack_license 
order by count desc limit 5;

To find out the average number of passengers per trip:

select sum(passenger_count)/count(passenger_count) from trips;

To count the number of trips made per month:

select pickup_month, count(*) from 
(select month(pickup_datetime) as pickup_month from trips) p 
group by pickup_month order by pickup_month;

Using Hive with Spark

We can use Hive tables in any Spark-based application. pyspark will launch us into a SparkSession automatically, which we can access from the spark variable and immediately start sending queries to our tables:

spark.sql("select * from taxis limit 5").collect()

The same spark variable will be available if we start a PySpark jupyter notebook on https://jupyter.cs.rutgers.edu. However, it is recommended that only the Python3 kernel should be used as the ability to visualize data from Hive queries is currently broken when using a PySpark notebook. You can visualize our “trips per month” query, above, with the following:

from pyspark import SparkContext
from pyspark.sql import HiveContext

sc = SparkContext()
hc = HiveContext(sc)

df = hc.sql(("select pickup_month, count(*) from " +
             "(select month(pickup_datetime) as pickup_month from trips) p " +
             "group by pickup_month order by pickup_month")).toPandas()

%matplotlib inline
df.plot(x="pickup_month", y="count(1)")

The visualization should roughly resemble:

trips per month