In this tutorial, you’ll be performing data manipulation and querying using both Spark and Hive.
- The source for the data used in this example
- Exploration of the data similar to what we’re going to delve into here
- NYC Taxis: A Day in the Life which uses the above-linked data
- Big Data Computation of Taxi Movement in New York City, a paper showing what is possible with data of this scale (and beyond!)
If you’re on campus and on wifi, make sure you’re not on the
CS-Research network. Downloading data of this size while on this wifi network violates the network’s policy and you’ll be asked to explain yourself 🙂
Download the taxi cab data linked to above to your local machine, as the CS servers currently lack a utility that can unzip
7zip files. Uncompress each of the files, then recompress them with
bzip2 before using
rsync to upload them to
/common/clusterdata/<username>. Then uncompress them again, and load the CSVs into HDFS, say
hdfs dfs -put *.csv /user/<username>/nyc_taxi_2013_data.
Understanding the Data Set
In the city, in order to be an official yellow cab, the car must have a medallion. Medallion IDs can be found in the first column of our data. However, in order for one to drive a taxi, medallioned or otherwise (e.g. the green “boro” cabs are not medallioned), one must have a license, known as a “hack license”. Column 2 of our data reflects the taxi driver’s hack license ID. A single taxi may be driven by multiple drivers over the course of its lifetime; for example, the taxi that has been driven the most frequently in this entire data set–
20BA941F62CC07F1FA3EF3E122B1E9B2 which was driven 22,534 times–was driven by 122 different drivers.
Unfortunately, the driver license IDs were improperly anonymized, so it will look as though one driver made an absurd amount of money in 2013. Nonetheless, we can use this data set to draw some interesting conclusions and to illustrate some Spark and Hive concepts.
Using Spark to Explore the Data
The script below will produce the most-frequent combinations of taxi and driver, or
(medallion, hack_license). The input to this script should be
hdfs:///user/<username>/nyc_taxi_2013_data/*.csv and remember to run it with
spark-submit --master yarn:
import csv import sys from pyspark import SparkContext def split_line(line): # this is a one-row "csv" file # we do this so we read in each row properly since it comes from a CSV # we're interested in the first two columns, # the taxi's "medallion" and the driver's "hack license" # yes, the file headers are counted, # but they only occur 12 times so they're washed out for row in csv.reader([line]): return ((row, row), 1) def main(input_file): sc = SparkContext(appName="FindBusiestNYC2013Taxi") data = sc.textFile(input_file) medallion_license_counts = (data.map(split_line) .reduceByKey(lambda a, b : a + b) .sortBy(lambda x: x, ascending=False)) for (pair, count) in medallion_license_counts.take(5): print(pair, count) sc.stop() if __name__ == "__main__": main(sys.argv)
The result will be the five busiest taxis by medallion ID and hack license, printed on
('E2D92DC49D2E3A841FFCE27ADA3921F0', '51C1BE97280A80EBFA8DAD34E1956CF6') 17603 ('552CCF061B871F7179CC0357086CB42C', 'D85749E8852FCC66A990E40605607B2F') 16467 ('24F08861B2158ACD82BE9AD23EE3B06E', '3D757E111C78F5CAC83D44A92885D490') 16139 ('7C656D41E55C63899BFEED59030159B3', '23DF80C977D15141F11DD713C523C311') 16073 ('3C6281F8AA19CBF6FD30411FE2CA8F9F', '3AAB94CA53FE93A64811F65690654649') 15363
While this is running, check the
ApplicationMaster to see how Spark has divided this data into MapReduce tasks.
Moving to Hive
We can extract lots of information like this from this data set quicker and easier if we move away from reading from CSV files and query the data directly. For that, we’re going to use Apache Hive.
Disdvantages of Hive
The advantage of using Hive over a traditional DBMS is that the queries are run using MapReduce and thus can sometimes be faster than an untuned traditional DBMS for large data sets out. However, Hive lacks many features of traditional DBMSs, specifically anything to do with data integrity (primary keys, auto-incrementing fields, foreign keys, constraints, triggers, automatically-populated fields) and its tables are intended to be written to once and never (or rarely) updated.
Another disadvantage of using Hive is that its query language is “SQL-inspired”; it does not adhere to the official SQL specification, but rather, it is just close enough to real SQL to be confusing to folks who are comfortable with SQL.
Nonetheless, Hive is super useful for keeping our large data sets structured, organized, and quickly-accessible.
Currently, this data set lacks a way to uniquely identify an individual trip–again, this is where a primary key would come in handy. We can cut down on some of the data duplication by splitting these two fields into a
taxis table and keeping the rest in a
trips table. In the end, what we’ll have looks something like this:
taxis ----- taxi_id,medallion,hack_license 1,E2D92DC49D2E3A841FFCE27ADA3921F0,51C1BE97280A80EBFA8DAD34E1956CF6 . . .
trips ----- trip_id,taxi_id,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude 1,1,VTS,1,,2013-01-13 11:06:00,2013-01-13 11:21:00,6,900,3.44,-73.958046,40.769424,-73.998337,40.75008 2,1,VTS,1,,2013-01-13 10:36:00,2013-01-13 10:44:00,6,480,2.13,-73.985039,40.727901,-73.99176,40.749218 . . .
With a traditional DBMS, the foreign key relationship between
trips would be automatically supplied and maintained by the DBMS, but with Hive, it is up to us to impose and enforce this relationship. This is a key reason why Hive tables are intented to be written to once and never modified.
- Download the scripts create_taxis_reference_csv.py and create_trip_table_csvs.py to your
- Start a screen session since, these scripts can take a while to run and you might want to log out before they’ve finished.
- You can start a
screen -U -S <name of screen session>and, once you’re there, you can exit from it without killing the session with
- You can return to the screen with
screen -r <name of screen session>and when you’re done, simply type
exitto kill the screen.
- You can start a
spark-submit --master yarn create_taxis_reference_csv.py hdfs:///user/<username>/nyc_taxi_2013_data/trip_data*.csvwhich should create a 43mb file on your disc called
taxis_table_data.csv. There should be 610,132 unique pairs of medallions and hack licenses. Don’t worry about the lack of headers on this or any other file we’ll create.
spark-submit --master yarn create_trip_table_csvs.py hdfs:///user/<username>/nyc_taxi_2013_data/trip_data*.csv. Once it’s done, check
hdfs:///user/<username>/nyc_taxi_2013_data/output/where you should see this data spread across multiple
- Then run
hdfs dfs -cat /user/<username>/nyc_taxi_2013_data/output/part-* | nl -s ',' | hdfs dfs -put - /user/<username>/nyc_taxi_2013_data/trips_table_data.csv. You can close the screen once this has finished.
hdfs dfs -get /user/<username>/nyc_taxi_2013_data/trips_table_data.csvto download the giant file we just created.
- Clean up this file a bit:
perl -p -e 's/^\s*//' trips_table_data.csv > temp
mv temp trips_table_data.csv.
- Make sure your
/common/clusterdatadirectory is world-readable with
chmod 755 /common/clusterdata/<username>.
Loading the Prepared Data into Hive Tables
We’ll use the beeline client to create Hive tables and run some example queries. On our servers, you can start
beeline -u "jdbc:hive2://data-services2.cs.rutgers.edu:2181,data-services3.cs.rutgers.edu:2181,data1.cs.rutgers.edu:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"
See this page for more information regarding our installation of Hive.
If you don’t already have a Hive database, you can create one with
create database if not exists <name>;, where
<name> is recommended to be your username, unless you have a specific purpose in mind for this database. There’s no limit to the number of databases you can create. If you don’t create a database, your tables will be created in the
default database and accessible by other users.
You can switch to any database with
use <name>; and get a list of all of the databases with
We’ll now begin to load the tables, starting with the
taxis table since the
trips table references it. Run the following Hive SQL command to create the table:
create table if not exists taxis (taxi_id BIGINT, medallion VARCHAR(35), hack_license VARCHAR(35)) row format delimited fields terminated by ',';
Then populate the table with
load data local inpath '/common/clusterdata/<username>/nyc_taxi_2013/taxis_table_data.csv' into table taxis;`
To verify that the load succeeded, you can take a look at the first 10 rows of the table with
select * from taxis limit 10;.
Next, we’ll load the
create table if not exists trips (trip_id BIGINT, taxi_id BIGINT, vendor_id CHAR(3), rate_code INT, store_and_fwd_flag VARCHAR(1), pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, passenger_count INT, trip_time_in_secs BIGINT, trip_distance FLOAT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE) row format delimited fields terminated by ',';
load data local inpath '/common/clusterdata/<username>/nyc_taxi_2013/trips_table_data.csv' into table trips;
Note that Hive queries don’t seem to allow
To see the top five busiest taxis, like we did earlier using our script and the CSV files:
select taxis.taxi_id, taxis.medallion, taxis.hack_license, count(trips.trip_id) as count from taxis, trips where trips.taxi_id = taxis.taxi_id group by taxis.taxi_id, taxis.medallion, taxis.hack_license order by count desc limit 5;
To find out the average number of passengers per trip:
select sum(passenger_count)/count(passenger_count) from trips;
To count the number of trips made per month:
select pickup_month, count(*) from (select month(pickup_datetime) as pickup_month from trips) p group by pickup_month order by pickup_month;
Using Hive with Spark
We can use Hive tables in any Spark-based application.
pyspark will launch us into a
SparkSession automatically, which we can access from the
spark variable and immediately start sending queries to our tables:
spark.sql("select * from taxis limit 5").collect()
spark variable will be available if we start a
jupyter notebook on https://jupyter.cs.rutgers.edu. However, it is recommended that only the Python3 kernel should be used as the ability to visualize data from Hive queries is currently broken when using a
PySpark notebook. You can visualize our “trips per month” query, above, with the following:
from pyspark import SparkContext from pyspark.sql import HiveContext sc = SparkContext() hc = HiveContext(sc) df = hc.sql(("select pickup_month, count(*) from " + "(select month(pickup_datetime) as pickup_month from trips) p " + "group by pickup_month order by pickup_month")).toPandas() %matplotlib inline df.plot(x="pickup_month", y="count(1)")
The visualization should roughly resemble: