Running Spark with QFS on a Docker Swarm

In my last post, we put together a quick build that runs Apache Spark on the Personal Compute Cluster. The goal of that build was to simply demonstrate how to run Spark on a Docker Swarm. What it did not address well was how Spark would access data. I did set up GlusterFS shared volume that did allow Spark to read and write data files. However, this was not fast because the set up did not allow Spark to take advantage of data locality when scheduling task. As a result, almost all data reads and writes are done across the ethernet backbone. The goal in this post is to set up a Spark build on Docker Swarm that is paired with a distributed file system set up in a manner where data locality can be leveraged to speed up processing. There are choices for distributed file systems, the most notable being Hadoop’s HDFS and the Quantcast File System (QFS). For Read More …

Running Spark on a Docker Swarm

My project to create a Personal Compute Cluster has finally come to a point where I can create the first usable deployment of Apache Spark. The goal here is simply to get a usable deployment of Spark, but not necessarily a robust deployment. What this means is that I will set up Spark to run without HDFS or QFS as the distributed file system to hold data. Instead this set up will use the GlusterFS volume I created in my last post. The reason GlusterFS is not ideal for holding data that Spark will analyze is that Spark cannot take advantage of data locality with GlusterFS with the simple set up of GlusterFS that I did. Given my setup, Spark will see the GlusterFS volume as a local file system mount on each of the nodes. Because the GlusterFS volume presents the same files on each of the nodes, the GlusterFS volume behaves like a distributed file system. Spark just Read More …

ARM7 CPUs, Double Alignment, and Apache Spark

I haven’t posted an update to my data analysis projects in a while. Partly because my day job has been a bit busy lately, and partly because what time I do have for my recreational coding has been taken up by a problem I was experiencing with Apache Spark. I started have stability problems on my ODROID XU4 cluster. I didn’t fully understand the cause at first, thinking for the longest time it was my own code. In the end, it proved to be a bug in spark, or more specifically, an incompatibility between Spark’s memory management and the ARM71 platform of my ODROID XU4 cluster. The issue has to do with how some CPUs operate on double floating point values. These CPUs, including the 32-bit ARM71 CPU found in the ODROID XU4, requires that when the CPU operates on a double floating point value the 8 bytes of memory used to contain the value should be aligned to 8 byte Read More …

Quantcast File System 1.2 for ARM71

NOTE – This article has been updated. It now assumes you have set up the cluster with Ubuntu 16.04, and it has the latest builds of QFS v1.2.1 and Spark v2.2.0. I have been using the Quantcast File System (QFS) as my primary distributed file system on my ODROID XU4 cluster.  Due to QFS’s low memory footprint, it works well with Spark, allowing me to assign as much of the ODROID XU4’s limited 2 GB RAM footprint to the Spark executor running on a node. Recently, QFS 1.2 was released. This version brings many features and updates, many not relevant to my ODROID cluster use case. However, the most notable updates relevant to the ODROID XU4 cluster include: Correct Spark’s ability to create a hive megastore on a new QFS instance (QFS-332) Improved error reporting in the QFS/HDFS shim HDFS shim for the Hadoop 2.7.2 API, which the latest versions of Spark use. In this post, I will update the ODROID XU4 Read More …

Using Custom Hive UDFs With PySpark

Using Python to develop on Apache Spark is easy and familiar for many developers. However, due to the fact that Spark runs in a JVM, when your Python code interacts with the underlying Spark system, there can be an expensive process of data serialization and deserialization between the JVM and the Python interpreter. If you do most of your data manipulation using data frames in PySpark, you generally avoid this serialization cost because the Python code ends up being more of a high-level coordinator of the data frame operations rather than doing low-level operations on the data itself. This changes if you ever write a UDF in Python. To avoid the JVM-to-Python data serialization costs, you can use a Hive UDF written in Java. Creating a Hive UDF and then using it within PySpark can be a bit circuitous, but it does speed up your PySpark data frame flows if they are using Python UDFs. To illustrate this, I will rework the Read More …

Airline Flight Data Analysis – Part 2 – Analyzing On-Time Performance

In my last post on this topic, we loaded the Airline On-Time Performance data set collected by the United States Department of Transportation into a Parquet file to greatly improve the speed at which the data can be analyzed. Now, let’s take a first look at the data by graphing the average airline-caused flight delay by airline. This is a rather straightforward analysis, but is a good one to get started with the data set. Open a Jupyter python notebook on the cluster in the first cell indicate that we will be using MatPlotLib to do graphing: %matplotlib inline Then, in the next cell load data frames for the airline on time activity and airline meta data based on the parquet files built in the last post. Note that I am using QFS as my distributed file system. If you are using HDFS, simply update the file URLs as needed. air_data = spark.read.parquet(‘qfs://master:20000/user/michael/data/airline_data’) airlines = spark.read.parquet(‘qfs://master:20000/user/michael/data/airline_id_table’) Now we are ready to process Read More …

Airline Flight Data Analysis – Part 1 – Data Preparation

UPDATE – I have a more modern version of this post with larger data sets available here. This data analysis project is to explore what insights can be derived from the Airline On-Time Performance data set collected by the United States Department of Transportation. The data can be downloaded in month chunks from the Bureau of Transportation Statistics website. The data gets downloaded as a raw CSV file, which is something that Spark can easily load. However, if you download 10+ years of data from the Bureau of Transportation Statistics (meaning you downloaded 120+ one month CSV files from the site), that would collectively represent 30+ GB of data. For commercial scale Spark clusters, 30 GB of text data is a trivial task. However, if you are running Spark on the ODROID XU4 cluster or in local mode on your Mac laptop, 30+ GB of text data is substantial. So, before we can do any analysis of the dataset, we need to Read More …

Adding a New Node to the ODROID XU4 Cluster

I recently acquired another ODROID XU4 device (and a MicroSD card for bulk storage) to add it to my XU4 cluster. This new node brings my node count to five. Adding a new node to the cluster is relatively straight forward, but there are a lot of details. In the modern datacenter, this operation would be accomplished through a package manager, which would build the new node according to an image. However, I haven’t set up package management on my cluster so I will need to sit up the new node manually. In the original cluster configuration, I used a 5-port ethernet switch for the internal network. Given that there was an open port, no additional hardware beyond the new node is needed. However, if I were to add a sixth node (or beyond), I would need to update my ethernet switch to something such as an 8-port switch. I will also note that using the 40 mm PCB spacers I originally ordered makes Read More …

Coverting Apache Access Logs to Parquet Backed Data Frames on Spark

One of the analysis I am looking to do with my ODROID XU4 cluster is to take a look at various access patterns on my website by analyzing the Apache http access logs. Analyzing Apache access logs directly in Spark can be slow due to them being unstructured text logs. Converting to the logs to a data frame backed by partitioned parquet files can make subsequent analysis much faster. The first task is to create a mapper that can be used in Spark convert a row int eh access log to a Spark Row object. A Python 3 mapper would look like: # Parse an Apache access log. Assumes Python 3 import re from pyspark.sql import Row from datetime import datetime APACHE_ACCESS_LOG_PATTERN = ‘^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] “(\S+) (\S+) (\S+)” (\d{3}) (\d+) “((?:[^”]|”)+)” “((?:[^”]|”)+)”$’ DATETIME_PARSE_PATTERN = ‘%d/%b/%Y:%H:%M:%S %z’ # Returns a Row containing the Apache Access Log info def parse_apache_log_line(logline): match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) if match is None: return None Read More …

Using the Quantcast File System with Spark on the ODROID XU4 Cluster

I used to work at a company named Quantcast, which is known for processing petabyte-scale data sets. When operating at that scale, any inefficiency in your data infrastructure gets multiplied many times over. Early on, Quantcast found that the inefficiencies in Hadoop literally cost them by requiring more hardware to obtain certain levels of performance. So Quantcast set out to write a highly efficient big data stack. At the bottom of that stack was Quantcast’s distributed file system called QFS. Quantcast open-sourced QFS, and performed several studies to show the efficiency gains QFS had over HDFS. Given the first hand experience I have had with QFS performance, I thought it would interesting to get it running on the ODROID XU4 cluster in combination with Spark to see how much QFS would improve processing speeds in our set up. Spoiler alert: there was no time improvements, but QFS required less memory than HDFS, allowing me to assign more RAM to Spark. Installing Read More …