Airline Flight Data Analysis – Part 1 – Data Preparation (Reprise)

As some of you know, I previously explored building a Spark cluster using ODROID XU-4 single board computers. I was able to demonstrate some utility with this cluster, but it was limited. One analysis I attempted looking at was the Airline On-Time Performance data set collected by the United States Department of Transportation. The XU-4 cluster allowed me to make summarization graphs of the data, but not much more. The primary constraint was the RAM pool the cluster had, which was 8 GB total RAM across the four nodes, and the 10 year data set was greater than 30 GB uncompressed.

Now that I have built the Personal Compute Cluster, I decided to revise this data set to see if I could do more sophisticated analysis. The short answer is I can. But before I do that we need to prepare the raw data that we download from the Department of Transportation’s website into a format. Specifically we need to convert it from a collection of CSV files to a single parquet file. The last time I did this I went into detail as to the benefit of converting the files to parquet, so I will not repeat that here.

Loading Raw Data into QFS

Since I deployed the cluster’s software stack as a Docker Swarm stack, we are going to create a Docker container to interact with the QFS file system on the cluster. Furthermore, the container we create needs to attach to the same overlay network that the QFS file system is using. To do that, use the following command on the master node. Note that this assumes hat you have built the docker images as described in my post on running Spark with QFS.

docker run -it -u spark --network spark_cluster_network master:5000/qfs-master /bin/bash

You should get a shell prompt from within the container we just created. To unpack this command a little, notice the following:

  • The qfs-master image is used. The primary reason for this is that command aliases in this image to make it easier to interact with the QFS file system.
  • The user spark is is identified as the user the shell should be launched with.
  • The contained is attached to the swarm network named spark_cluster_network. You can validate that this network exists with the docker network ls command.

Once you are at the shell prompt for the docker container, you can easily peruse the QFS file system with a qfs -ls / command. The qfs command is the main entry point to all operations the file system. We do need to create a directory on the file system into which we will place the data. Do that with this command at the docker container’s command prompt:

qfs -mkdir /data/airline/raw/

Once you do that, it’s time to download the data. Since I last wrote about this data, I’ve spent more time downloading the data files from the Department of Transportation’s website. I have been able to put together a few data sets: one with 11 years of data, another with 16, and a third with 21 years of data. The files can be downloaded here:

However, we will not click on the above weblink, instead use it in our docker container (change the wget URL to the data set you want):

wget -O airline-data.2003-2018.tar.gz "https://diybigdata.net/?smd_process_download=1&download_id=821"
tar -xvf airline-data.2003-2018.tar.gz 
cptoqfs -d ./airline-data/ -k /data/airline/raw/ -r 2

At this point the airline data will have been added to the QFS filesystem on the cluster. If you want to see the detailed activity of loading the files into AFS, add the -v option to the cptoqfs command. You can verify that with a qfs -ls command. When finished, you may exit the container with the exit command.

Note that extracting the compressed tar file for the 15-year data set results in a directory with over 40GB of content, and the 20-year data set is 56 GB uncompressed. Since I set up Docker on this computer to place it’s work directory on the largest partition mounted at /mnt/data, that should be OK. If you did not set up the server running this container the same way I did, you might want to double check to ensure you will not run out of disk space when inflating the tar file.

After you exit the container, you might want to run the docker system prune -f command to delete the image of the container we just exited. This will also delete the download tar file and the CSVs that were extract from the tar file (because you downloaded it in the container).

Converting Data to a Partitioned Parquet File

Now that the uncompressed CSVs are in the cluster’s distributed file system, it is time to convert them to the parquet file format. Since the cluster we are using today is much more powerful than the CU-4 cluster I previously did this work on, the code to generate the parquet files is much more straightforward.

In order to get things started, the first cell in the notebook will simply import libraries and create the Spark instance:

import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession\
        .builder\
        .appName("AirlineDataLoad")\
        .getOrCreate()

Then we define the schema by which the airline on-time performance data will be loaded into:

air_schema = T.StructType([
    T.StructField("Year", T.IntegerType()),
    T.StructField("Quarter", T.IntegerType()),
    T.StructField("Month", T.IntegerType()),
    T.StructField("DayofMonth", T.IntegerType()),
    T.StructField("DayOfWeek", T.IntegerType()),
    T.StructField("FlightDate", T.StringType()),
    T.StructField("UniqueCarrier", T.StringType()),
    T.StructField("AirlineID", T.LongType()),
    T.StructField("Carrier", T.StringType()),
    T.StructField("TailNum", T.StringType()),
    T.StructField("FlightNum", T.IntegerType()),
    T.StructField("OriginAirportID", T.IntegerType()),
    T.StructField("OriginAirportSeqID", T.IntegerType()),
    T.StructField("OriginCityMarketID", T.IntegerType()),
    T.StructField("Origin", T.StringType()),
    T.StructField("OriginCityName", T.StringType()),
    T.StructField("OriginState", T.StringType()),
    T.StructField("OriginStateFips", T.IntegerType()),
    T.StructField("OriginStateName", T.StringType()),
    T.StructField("OriginWac", T.IntegerType()),
    T.StructField("DestAirportID", T.IntegerType()),
    T.StructField("DestAirportSeqID", T.IntegerType()),
    T.StructField("DestCityMarketID", T.IntegerType()),
    T.StructField("Dest", T.StringType()),
    T.StructField("DestCityName", T.StringType()),
    T.StructField("DestState", T.StringType()),
    T.StructField("DestStateFips", T.IntegerType()),
    T.StructField("DestStateName", T.StringType()),
    T.StructField("DestWac", T.IntegerType()),
    T.StructField("CRSDepTime", T.StringType()),
    T.StructField("DepTime", T.StringType()),
    T.StructField("DepDelay", T.DoubleType()),
    T.StructField("DepDelayMinutes", T.DoubleType()),
    T.StructField("DepDel15", T.DoubleType()),
    T.StructField("DepartureDelayGroups", T.IntegerType()),
    T.StructField("DepTimeBlk", T.StringType()),
    T.StructField("TaxiOut", T.DoubleType()),
    T.StructField("WheelsOff", T.StringType()),
    T.StructField("WheelsOn", T.StringType()),
    T.StructField("TaxiIn", T.DoubleType()),
    T.StructField("CRSArrTime", T.StringType()),
    T.StructField("ArrTime", T.StringType()),
    T.StructField("ArrDelay", T.DoubleType()),
    T.StructField("ArrDelayMinutes", T.DoubleType()),
    T.StructField("ArrDel15", T.DoubleType()),
    T.StructField("ArrivalDelayGroups", T.IntegerType()),
    T.StructField("ArrTimeBlk", T.StringType()),
    T.StructField("Cancelled", T.DoubleType()),
    T.StructField("CancellationCode", T.StringType()),
    T.StructField("Diverted", T.DoubleType()),
    T.StructField("CRSElapsedTime", T.DoubleType()),
    T.StructField("ActualElapsedTime", T.DoubleType()),
    T.StructField("AirTime", T.DoubleType()),
    T.StructField("Flights", T.DoubleType()),
    T.StructField("Distance", T.DoubleType()),
    T.StructField("DistanceGroup", T.IntegerType()),
    T.StructField("CarrierDelay", T.DoubleType()),
    T.StructField("WeatherDelay", T.DoubleType()),
    T.StructField("NASDelay", T.DoubleType()),
    T.StructField("SecurityDelay", T.DoubleType()),
    T.StructField("LateAircraftDelay", T.DoubleType()),
    T.StructField("FirstDepTime", T.StringType()),
    T.StructField("TotalAddGTime", T.StringType()),
    T.StructField("LongestAddGTime", T.StringType()),
    T.StructField("DivAirportLandings", T.StringType()),
    T.StructField("DivReachedDest", T.StringType()),
    T.StructField("DivActualElapsedTime", T.StringType()),
    T.StructField("DivArrDelay", T.StringType()),
    T.StructField("DivDistance", T.StringType()),
    T.StructField("Div1Airport", T.StringType()),
    T.StructField("Div1AirportID", T.StringType()),
    T.StructField("Div1AirportSeqID", T.StringType()),
    T.StructField("Div1WheelsOn", T.StringType()),
    T.StructField("Div1TotalGTime", T.StringType()),
    T.StructField("Div1LongestGTime", T.StringType()),
    T.StructField("Div1WheelsOff", T.StringType()),
    T.StructField("Div1TailNum", T.StringType()),
    T.StructField("Div2Airport", T.StringType()),
    T.StructField("Div2AirportID", T.StringType()),
    T.StructField("Div2AirportSeqID", T.StringType()),
    T.StructField("Div2WheelsOn", T.StringType()),
    T.StructField("Div2TotalGTime", T.StringType()),
    T.StructField("Div2LongestGTime", T.StringType()),
    T.StructField("Div2WheelsOff", T.StringType()),
    T.StructField("Div2TailNum", T.StringType()),
    T.StructField("Div3Airport", T.StringType()),
    T.StructField("Div3AirportID", T.StringType()),
    T.StructField("Div3AirportSeqID", T.StringType()),
    T.StructField("Div3WheelsOn", T.StringType()),
    T.StructField("Div3TotalGTime", T.StringType()),
    T.StructField("Div3LongestGTime", T.StringType()),
    T.StructField("Div3WheelsOff", T.StringType()),
    T.StructField("Div3TailNum", T.StringType()),
    T.StructField("Div4Airport", T.StringType()),
    T.StructField("Div4AirportID", T.StringType()),
    T.StructField("Div4AirportSeqID", T.StringType()),
    T.StructField("Div4WheelsOn", T.StringType()),
    T.StructField("Div4TotalGTime", T.StringType()),
    T.StructField("Div4LongestGTime", T.StringType()),
    T.StructField("Div4WheelsOff", T.StringType()),
    T.StructField("Div4TailNum", T.StringType()),
    T.StructField("Div5Airport", T.StringType()),
    T.StructField("Div5AirportID", T.StringType()),
    T.StructField("Div5AirportSeqID", T.StringType()),
    T.StructField("Div5WheelsOn", T.StringType()),
    T.StructField("Div5TotalGTime", T.StringType()),
    T.StructField("Div5LongestGTime", T.StringType()),
    T.StructField("Div5WheelsOff", T.StringType()),
    T.StructField("Div5TailNum", T.StringType())
])

Now, load the raw CSVs, convert them to a data frame, and then rewrite the data as a partitioned parquet file:

raw_df = spark.read.csv( 
        'qfs:///data/airline/raw/On_Time_On_Time_Performance_*.csv', 
        header=True, 
        schema=air_schema,
        escape='"')

airline_data = raw_df.select(
        "Year","Quarter","Month","DayofMonth","DayOfWeek","FlightDate","UniqueCarrier","AirlineID",
        "Carrier","TailNum","FlightNum","OriginAirportID","OriginAirportSeqID","OriginCityMarketID",
        "Origin","OriginCityName","OriginState","OriginStateFips","OriginStateName","OriginWac",
        "DestAirportID","DestAirportSeqID","DestCityMarketID","Dest","DestCityName","DestState",
        "DestStateFips","DestStateName","DestWac","CRSDepTime","DepTime","DepDelay","DepDelayMinutes",
        "DepDel15","DepartureDelayGroups","DepTimeBlk","TaxiOut","WheelsOff","WheelsOn","TaxiIn","CRSArrTime",
        "ArrTime","ArrDelay","ArrDelayMinutes","ArrDel15","ArrivalDelayGroups","ArrTimeBlk","Cancelled",
        "CancellationCode","Diverted","CRSElapsedTime","ActualElapsedTime","AirTime","Flights","Distance",
        "DistanceGroup","CarrierDelay","WeatherDelay","NASDelay","SecurityDelay","LateAircraftDelay"
    ).withColumn(
        'FlightDate', F.to_date(F.col('FlightDate'),'yyyy-MM-dd')
    )

airline_data.repartition('Year').write.partitionBy(
        "Year","Month"
    ).parquet(
        'qfs:///data/airline/processed/airline_data',
        mode='overwrite'
    )

You will note a few details here. First, the column FlightDate is converted from a string representation of a date to a native date format via the call to withColumn(). The second thing to note is that the data is repartitioned by Year before writing to disk partitioned by Year and Month. The reason for this is that I wanted to ensure that there would be sure that each of the Year and Month partitions had exactly one part file. By doing this, each part file ends up being about 20 MB in size, which is on the smaller size. A misnomer in big data is that it is best to have many part files as it will increase parallelism. While you do want parallelism, it needs to be balanced with the fixed overhead each independent parallel task creates. That right balance is dependent on the underlying cluster hardware and networking, but the outcome of that balance should be tasks that result from directly reading these part files should be 10s of seconds to a couple minutes longe each. Any shorter and the task overhead starts to dominate you overall run time, and any longer and you likely do not have good parallelism. On my Personal Compute Cluster, this means having part files in the 100-300 MB range.

The final step is to convert the various look up tables from CSV format to parquet:

from pyspark.sql import Row

def mapAirlineIdRow(r):
    airline_id = int(r.Code)
    airline_name_parts = r.Description.split(':')
    airline_name = airline_name_parts[0].strip()
    iata_carrier = airline_name_parts[1].strip()
    out = Row(
        AirlineID=airline_id,
        AirlineName=airline_name,
        Carrier=iata_carrier
    )
    return out;

airline_id_csv = spark.read.csv(
    'qfs:///data/airline/raw/LUT-DOT_airline_IDs.csv',
    header=True,
    escape='"'
)

airline_id_df = airline_id_csv.rdd.map(mapAirlineIdRow).toDF().coalesce(1)
airline_id_df.write.parquet(
        'qfs:///data/airline/processed/DOT_airline_codes_table',
        mode='overwrite'
    )
    
airline_id_df.take(1)

airport_schema = T.StructType([
    T.StructField("Code", T.StringType()),
    T.StructField("Description", T.StringType()),
])

def mapAirportIdRow(r):
    airport_id = r.Code
    airport_city = ''
    airport_name = ''
    airport_name_parts = r.Description.split(':')
    if len(airport_name_parts) is 2:
        airport_city = airport_name_parts[0].strip()
        airport_name = airport_name_parts[1].strip()
    elif len(airport_name_parts) is 1:
        airport_city = airport_name_parts[0]
        airport_name = r.Code
    
    out = Row(
        
        AirportID=airport_id,
        City=airport_city,
        Name=airport_name
    )
    return out;

airport_codes_csv = spark.read.csv(
    'qfs:///data/airline/raw/LUT-airport_codes.csv',
    header=True,
    escape='"',
    schema=airport_schema
)

airport_codes_df = airport_codes_csv.rdd.map(mapAirportIdRow).toDF().coalesce(1)
airport_codes_df.write.parquet(
        'qfs:///data/airline/processed/airport_codes_table',
        mode='overwrite'
    )

airport_id_csv = spark.read.csv(
    'qfs:///data/airline/raw/LUT-DOT_airport_IDs.csv',
    header=True,
    escape='"',
    schema=airport_schema
)

airport_id_df = (
    airport_id_csv
    .rdd.map(mapAirportIdRow)
    .toDF()
    .withColumn(
        'AirportID',
        F.col('AirportID').cast(T.IntegerType())
    )
    .coalesce(1)
)
airport_id_df.write.parquet(
        'qfs:///data/airline/processed/airport_id_table',
        mode='overwrite'
    )

All the code for the Jupyter notebook can be viewed in my GitHub repository. Next we will start leveraging this data in various ways.

Leave a Reply