Streaming Manhattan Traffic with Spark

9 minute read

Github link to notebook

Introduction

In this notebook, we will be creating a real time visualization of Manhattan traffic using sensor data from this URL at the NYC DOT website. The URL updates “several times per minute” according to the documentation. Because of this, we’ll plug it into Spark Streaming and Geoff Boeing’s osmnx package for street networks.

This side project evolved from a Spark Streaming project I had in grad school after stumbling across the release of osmnx during a winter break. This package allows for far more in-depth applications than what I’m using it for, and I highly recommend checking it out if you have any projects involving street networks.

While this could be accomplished in regular Python (and this application isn’t particularly useful), I’m hoping this will aid those looking to use PySpark for Spark Streaming. Aside from the documentation and official programming guide, there aren’t a lot of examples of Spark Streaming using PySpark available, as most are in Scala.

Before we go into the details, I wanted to give a brief overview of Spark Streaming. Like regular Spark, it is used for processing the data - not generating or storing it. It is very similar to regular Spark, but is designed to be able to handle constantly updating data. Here is an abstract graphical representation of the process:

Here is a more realistic representation that further showcases its role in an ecosystem with other tools. In this example, we will be using the Spark SQL component in order to convert our DStreams into a data frame in order to utilize operations from pandas:

Spark Streaming is arguably not even real streaming when you get into the details. Specifically, it ingests the data in a series of micro batches (that update at specified intervals) referred to as DStream objects. Because the DStream is updated at set intervals, it is not technically “real time”. DStream objects are a series of RDDs at their core, and there are a lot of similar operations between them:

Spark Streaming ingests data in three ways:

1) From another source like Kafka, Flume, etc.

2) From a web socket

3) From text files

I’m running Spark on my local Windows machine, so I’m going to go with the text file option. It sounds easy enough at first, but has one hangup: Spark Streaming will not pick up the text files unless they are newly generated after the Spark streaming context starts. Therefore, we have to run it in combination with this script that uses the requests library to grab data from the web.

From there, we will read the data into a DStream with a Streaming context, and then register it as a temporary table in the SQL context in order to extract it to a pandas data frame. There is a fair amount of manipulation required for the data in order to get it to the exact format we want before plotting it, so the pandas operations makes this easier. Lastly, we will plot the speed at the individual sensors on the city map.

Set Up

Before getting to the fun stuff, we have to set up a few things first:

  • Set up Spark & Spark contexts
  • Import other libraries
  • Set up the road network
  • Map the latitude/longitude points to nodes on the road network

We’ll start with assigning a couple of options to variables in order to make it easier to change in the future.

# Where the web scraper script is outputting files
streamFilePath = "file:/C:/Users/JeffM/Documents/Projects/Spark Streaming/Data/"

# Number of seconds for Spark to ingest the data files
microBatchSeconds = 30

Set up Spark & Spark contexts

Since I’m running Spark on my local Windows machine, I have to do a few additional steps to make it play nicely with Jupyter Notebooks:

import os
import sys

# Set the path for the Spark folder
sparkPath = "C:/Users/JeffM/Documents/Spark/spark-2.0.2-bin-hadoop2.7"

os.environ['SPARK_HOME'] = sparkPath
sys.path.append(sparkPath + '/bin')
sys.path.append(sparkPath + '/python')
sys.path.append(sparkPath + '/python/pyspark' "C:/Spark/python/pyspark")
sys.path.append(sparkPath + '/python/pyspark/lib')
sys.path.append(sparkPath + '/python/pyspark/lib/pyspark.zip')
sys.path.append(sparkPath + '/python/pyspark/lib/py4j-0.10.3-src.zip')
sys.path.append("C:/Program Files/Java/jre1.8.0_111/bin")

This next part sets up the actual contexts for Spark. We have three separate contexts here:

1) SparkContext

2) StreamingContext

3) SQLContext

import pyspark
from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext("local[2]", "trafficData")  # Local[#] sets # of cores to use
ssc = StreamingContext(sc, microBatchSeconds)  # Seconds per micro-batch
sqlContext = pyspark.SQLContext(sc)

Import other libraries

# Data manipulation
import pandas as pd
import numpy as np

# Timestamps
from datetime import datetime, timedelta

# City network and plots
import osmnx as ox, geopandas as gdp
import matplotlib.pyplot as plt

# To replace output plots for streaming
from IPython import display

# Sets intervals for updating the plot
import time

%matplotlib inline

Set up the road network

If you are familiar with graph theory, the road networks in the osmnx package are a series of nodes and edges. We will map our sensors to nodes in the network using their longitude/latitude locations, but we first have to create the actual network.

Osmnx fortunately has a variety of methods to create a road network (in addition to other networks like bike networks and walking networks). Here, we’ll use the “graph_from_place” method.

We are then going to project our network in order to get a top-down view of Manhattan. I’ll include plots of the projected and non-projected versions to clarify this concept.

# Retrieving the network for Manhattan
manhattan = ox.graph_from_place('Manhattan, New York, USA', network_type='drive')

# Projecting the network for a top-down view
manhattan_projected = ox.project_graph(manhattan)
print('Non-projected:')
fig1 = ox.plot_graph(manhattan, fig_height=4.5, node_size=0, 
              node_alpha=1,edge_linewidth=0.2, dpi=100)

print('Projected:')
fig2 = ox.plot_graph(manhattan_projected, fig_height=4.5, node_size=0, 
              node_alpha=1,edge_linewidth=0.2, dpi=100)
Non-projected:

Projected:

Map the latitude/longitude points to nodes on the road network

In order to match up our streaming data to the city network, we’re going to have to find the corresponding network node for each sensor using its latitude/longitude points. We’re going to do this before we begin streaming by using a template (any of the files generated from the web scraping script - the date or time don’t matter).

We’re doing this here so we can just join this data frame to the streaming data in order to avoid re-calculating which node belongs to each sensor every time we receive new streaming data. This is possible due to having a set amount of sensors with static locations and ids.

# Importing only the fields we're interested in
dfNodes = pd.read_csv('trafficDataTemplate.txt',
            usecols=[0, 6, 11],
            names=['id', 'linkPoints', 'borough'])

# Filtering to Manhattan and dropping the borough column
dfNodes = dfNodes[dfNodes['borough'] == 'Manhattan']
dfNodes = dfNodes[['id', 'linkPoints']]

# Splitting the column containing all Lat/Long combinations
dfNodes['splitPoints'] = dfNodes['linkPoints'].apply(lambda x: x.split(' '))

# Reshaping our data frame to have a row for each Lat/Long point
idNodes = []
for x,y in zip(dfNodes['splitPoints'], dfNodes['id']):
    for a in np.arange(len(x)):
        idNodes.append((y, x[a]))
dfNodes = pd.DataFrame(idNodes, columns=['id', 'LatLong'])

dfNodes = dfNodes.replace('', np.NaN).dropna()

# Parsing Lat/Long into individual columns
# Longitude has an if statement since some records contain '-' for longitude
dfNodes['Latitude'] = dfNodes['LatLong'].apply(lambda x: x.split(';')[0])
dfNodes['Longitude'] = dfNodes['LatLong'] \
    .apply(lambda x: x.split(';')[1] if len(x.split(';')) > 1 else None)

# Dropping incorrect longitude records and converting everything to floats
dfNodes = dfNodes[dfNodes['Longitude'] != '-'][['id', 'Latitude', 'Longitude']].astype(float)

# Obtaining the nearest nodes in the network from the Lat/Long points
nodes = []
for row in dfNodes.iterrows():
    nearest_node = ox.get_nearest_node(manhattan, (dfNodes['Latitude'].ix[row], 
                                                   dfNodes['Longitude'].ix[row]))
    nodes.append(nearest_node)
dfNodes['Node'] = nodes

# Removing duplicate nodes
dfNodes.drop_duplicates(subset='Node', inplace=True)

dfNodes.head()
id Latitude Longitude Node
0 2.0 40.73933 -74.010040 593816275
1 2.0 40.73895 -74.010120 246649429
2 2.0 40.73760 -74.010021 42439099
3 2.0 40.73460 -74.010260 42453468
4 2.0 40.72912 -74.010781 246857084

Streaming

This section will do the actual streaming. It needs to be run in parallel with the web scraping script in order to continuously update the data folder with new files. As a reminder, Spark Streaming will not continuously re-read a file, and will only stream new files placed into a location.

Our process here is to read the files using the streaming context and register them in the SQL context as a temporary table. Once it’s in a temporary table, we can pull it out into a pandas data frame, perform additional manipulation, and finally plot it.

Indexes

Taken directly from the documentation

0) Id: TRANSCOM Link ID

1) Speed: Average speed a vehicle traveled between end points on the link in the most recent interval

2) TravelTime: Time the average vehicle took to traverse the link

3) Status: Artifact - not useful

4) DataAsOf: Last time data was received from link

5) linkId: TRANSCOM Link ID (same as ID field)

6) linkPoints: Sequence of Lat/Long points, describes locations of the senosr links. Google compatible polyline form.

7) EncodedPolyLine: http://code.google.com/apis/maps/documentation/polylineutility.html Google compatible poly levels form.

8) EncodedPolyLineLvls: http://code.google.com/apis/maps/documentation/polylineutility.html

9) Owner: Owner of the Detector

10) Transcom_id: Artifact - Not Useful

11) Borough: NYC borough Brooklyn, Bronx, Manhattan, Queens, Staten Island

12) linkName: Description of the link location and end points

# Reads files from the specified path
trafficStream = ssc.textFileStream(streamFilePath).map(lambda line: line.split(',')) \
                   .filter(lambda line: line[11] == 'Manhattan')  # Removes other boroughs

# Puts specified columns into a temporary table
trafficStream.map(lambda line: (line[0], line[1], line[4], line[6], line[11])) \
    .foreachRDD(lambda rdd: rdd.toDF().registerTempTable("traffic"))

# Begins the streaming session
# Any other operations on the streaming data must come before this line
ssc.start()

Since we have to wait for the streaming context to ingest the data and register it to a temporary table, you will have to wait ~30 seconds before running the next block.

This section will pull the data into a data frame, perform manipulations to ultimately assign colors to the nodes corresponding to the sensors, and plot the data. It is an infinite loop that refreshes at the same interval as our micro batches for our streaming context, and will cease operation upon interruption. Additionally, it utilizes IPython’s display component to re-write the output upon each refresh.

while True:
    try:
        # Putting our streaming data from the temporary table into a data frame
        df = sqlContext.sql("select * from traffic").toPandas()
        df.columns = ['id', 'Speed', 'Date', 'linkPoints', 'Borough']

        # Removing records not in current year/month/day
        # Traffic data sometimes incorrectly includes old records signifying a sensor error
        df['Date'] = pd.to_datetime(df['Date'])  # Converting time to datetime
        df = df[df['Date'].dt.year == datetime.now().year]
        df = df[df['Date'].dt.month == datetime.now().month]
        df = df[df['Date'].dt.day == datetime.now().day]

        # Merging in the nodes
        df['id'] = df['id'].astype(float)  # Converting for the merge
        df = df.merge(dfNodes, on ='id')

        # Mapping the nodes to speed categories
        df['Speed'] = df['Speed'].astype(float)
        slow = []
        medium = []
        fast = []
        for row in df.iterrows():
            speed = df['Speed'].ix[row]
            node = df['Node'].ix[row]
    
            # Speeds in MPH
            if speed >= 40:
                fast.append(node)
            if speed < 40 and speed >= 20:
                medium.append(node)
            if speed < 20:
                slow.append(node)
        
        # Setting the node color based on the speed categories
        nc = ['g' if node in fast
              else ('y' if node in medium
              else ('r' if node in slow 
              else 'None'))
              for node in manhattan_projected.nodes()]

        # Timestamp
        EST = datetime.utcnow() - timedelta(hours=5)  # Calculating Eastern Standard Time
        print('Plot Generated:\n {:%Y-%m-%d \n %H:%M:%S} EST'.format(EST))

        # Plotting the map
        fig, ax = ox.plot_graph(manhattan_projected, fig_height=10, node_size=8, 
                                node_alpha=1,edge_linewidth=0.2, dpi=100, node_color=nc)
        # Sets the output
        display.display(ax)
        display.clear_output(wait=True)
        
        time.sleep(microBatchSeconds)  # Number of seconds between refreshes
    
    except KeyboardInterrupt:
        break
Plot Generated:
 2017-01-20 
 16:10:15 EST

<matplotlib.axes._subplots.AxesSubplot at 0x207658a6a90>

Lastly, this final piece will terminate the streaming context

ssc.stop()

Next Steps

No project is ever truly finished, so here are a few suggestions to improve either the visualization or this project for the future:

  • Color edges instead of nodes
  • Dynamically adjust node colors to deviation from mean of sensor speed
  • Use a web socket instead of the web scraping script
  • Expand to other NYC boroughs or other cities

Categories:

Updated: