In my last post, we put together a quick build that runs Apache Spark on the Personal Compute Cluster. The goal of that build was to simply demonstrate how to run Spark on a Docker Swarm. What it did not address well was how Spark would access data. I did set up GlusterFS shared volume that did allow Spark to read and write data files. However, this was not fast because the set up did not allow Spark to take advantage of data locality when scheduling task. As a result, almost all data reads and writes are done across the ethernet backbone.
The goal in this post is to set up a Spark build on Docker Swarm that is paired with a distributed file system set up in a manner where data locality can be leveraged to speed up processing. There are choices for distributed file systems, the most notable being Hadoop’s HDFS and the Quantcast File System (QFS). For this build, I am going to use QFS. It will be my eventual goal to create a build with HDFS too, and then do a performance comparison between the two builds.
Spark Swarm Design
In order to design the Spark deployment, we first need to take an inventory of all the services required to provide a usable deployment:
- Apache Spark – Of course, Spark is the central feature of this build. Two run it, two services are needed in the swarm:
- Spark Workers – The Spark worker is the process where Spark actually does (distributed) work. One Sparker worker process should be running on each physical node of the cluster. The worker will be configured to launch two executers each, splitting the node’s cores so that each executor has 6 cores each.
- Spark Master – This process manages the coordination of work amongst the Spark workers, and then performs any consolidation or reduction work needed to product the summarized results. You do not directly interact with the Spark master, but instead you use a client that tells the Spark master what computations you would like the cluster to do.
- Quantcast File System –
- Meta Server – QFS, like most distributed file systems, has a master service that keeps track of where file parts, or chunks, are located in the cluster. The meta server performs this function. For it to function, especially between restarts, the meta server will need a persistent store. To accomplish this, the meta server service will be affined to the master node and have access to a volume on the master node’s data partition.
- Chunk Servers – The chunk server is what managers the file parts stored on any given node. When the meta server gives a client access to a file or file part, it does so by pointing to a specific chunk server which then servers up the file to the client. In order to work, the chunk server on each node needs to have access to a persistent volume on the node on which the meta server will store the file parts it is responsible for. There will be one chunk server on each physical node.
- QFS Status Web Server – QFS comes with a stand alone web app that is used to monitor the status of the cluster as a whole.
- Jupyter Notebook Server – The Spark client we will use to perform work on the Spark cluster will be a Jupyter notebook, setup to use PySpark, the python version of Spark. We will also set up this Docker image to be able to provide a command line prompt , either through Jupyter’s shell feature or by running a
bashshell in the running image, that will enable easy command line access to QFS. This will enable loading of data into QFS.
One important detail is to understand is how Spark identifies data locality. When the Spark master is scheduling tasks against file parts, it considers where the QFS meta server reports the file parts are, as indicated by the chunk server’s IP address. If the Spark master knows it has a worker at the same IP address as the chunk server containing the file part, the Spark master will prefer asking that file part’s work to the Spark worker with the same IP address. This is not a guarantee, as the Spark master has a few other considerations in assigning tasks to Spark workers, but Spark worker having the same IP address as a chunk server is necessary for data locality to be leveraged.
In a Docker Swarm, there are two ways to ensure two services share the same IP address. The first is to not use private networking the the swarm cloud and instead only use the physical node’s IP address across all services running on that node. The second is to consolidate services that needs to share the same IP address into the save Docker image, thus effectively making them the same service. There are pros and cons to each approach. For my private Personal Compute Cluster (meaning, it’s mine, no one else is using it, and is not exposed to the public), the difference between the two approaches become more academic, and so in this build, I went with the approach that consolidates services into one Docker image. This means that a Docker image will be built that runs both the Spark worker and the QFS chunk server, and together they represent a single “worker node”. We will then run on each physical node one worker node container based on this image.
Given all that, the Docker image design is as follows:
worker-node– This image is the base Docker image for this entire build. In it I install both the Spark and QFS software, and configure it to run the Spark worker and QFS chunk server processes. It is also configured to provide some convenient Python packages to spark, specifically matplotlib & pandas.
qfs-master– This image builds on the
worker-nodeimage. In it I install the QFS status web server, and configure it to run both the QFS meta server and the status web server. Furthermore, I create a
bashshell configuration that provides some aliases to easily interact with QFS from a bash prompt running in this container.
spark-master– This image also builds on the
worker-nodeimage. It configures and runs the Spark master process.
jupyter-server– This image builds on the
qfs-masterimage, installing the Jupyter notebook server and configuring it to run with a connection to the Spark master running in the
The complete configuration and design and be reviewed in my GitHub repository.
All of the Docker containers in this build will be launched together as a Docker stack on the swarm running in the Personal Compute Cluster. There are some key details in how this stack is configured:
- All services in the stack are configured to use the same network called
cluster_network. While the subnet is specified for this network, there is no guarantee that
worker-nodecontainer on any given physical node will be launched with the same IP address across restarts. If a
worker-nodeis launched with a different IP address from what it previously had, the QFS system will launch into recovery mode. In recovery mode, the meta server reconfigures its map of file part locations based on the new IP addresses fo the chunk servers. This takes a few moments at start up, and then the QFS file system is ready to be used.
worker-nodeservice is set to have
deploymode of global. This causes Docker to instantiate one container based on this service’s image on each node in the swarm.
worker-nodehas two volumes attached. One is the data directory for QFS chunk server, the other is the work directory for the Spark worker.
qfs-masterservice is affined to run on the master node in the swarm, and has its own volume attached, which is where the meta server will save the file system’s meta data. The reason this service is set to run on the master node is so that the meta data it saved to the same place and is retained in between cluster launches. One could configure the meta server to use the GlusterFS volume on the Personal Compute Cluster instead, but using GlusterFS would be much slower than using the node’s file system directly.
- The volume that the
spark-mastercontainer mounts is primarily a work directory, and thus the Spark master can run on any physical node.
jupyter-servercontainer needs a persistent store to save notebooks to. To accomplish that, this container mounts the GlusterFS volume. This is not a performance concern because reading and writing notebooks is not in the critical path of any computation. Also, since the Jupyter server is also a Spark client, some spark processes will run in this container and thus a Spark working directory volume is also mounted.
- At the time of this post’s writing, I still haven’t addressed the CPU overheating problem I have been having with my nodes with the CPU is at 100% across all cores. Given that, the worker nodes are configured to not use 100% of the CPU. This doesn’t affect the thread count that a Spark worker node has access to, it just makes those thread run at a reduced through put.
Initializing the Spark QFS Swarm
Prior to the first time the Spark QFS Swarm is run, we need to configure the Personal Cluster. The first task is to build the Docker images. Do that by cloning my GitHub repository and running the shell script for building the images. On the master node (if you cloned this repository from my last post, then all you should do here is a
cd repositories git clone firstname.lastname@example.org:DIYBigData/personal-compute-cluster.git cd personal-compute-cluster/spark-qfs-swarm/ ./build-images.sh
Next, we need to create the various directories on the physical nodes that the various services will use. From the master node, run the following:
parallel-ssh -i -h ~/cluster/all.txt -l root "mkdir -p /mnt/data/qfs/logs" sudo mkdir -p /mnt/data/qfs/checkpoint parallel-ssh -i -h ~/cluster/all.txt -l root "mkdir -p /mnt/data/qfs/chunk" parallel-ssh -i -h ~/cluster/all.txt -l root "mkdir -p /mnt/data/spark" parallel-ssh -i -h ~/cluster/all.txt -l root "chgrp -R docker /mnt/data/qfs" parallel-ssh -i -h ~/cluster/all.txt -l root "chgrp docker /mnt/data/spark"
The next step is to initialize the QFS cluster. The initialization step creates a blank file system on the QFS cluster, and is effectively the same thing as erasing a hard drive. To initialize QFS, we will directly run the
qfs-master container on the master node
docker run -it --mount type=bind,source=/mnt/data/qfs,target=/data/qfs master:5000/qfs-master:latest /bin/bash
Then when in the Docker container’s
$QFS_HOME/bin/metaserver -c $QFS_HOME/conf/Metaserver.prp exit
Running the Spark QFS Swarm
Once the Personal Compute Cluster has been initialized as described above, launching the Spark QFS stack is pretty straight forward. From within the
spark-qfs-swarm directory of the master node clone of my git repository, simply run:
docker stack deploy -c deploy-spark-qfs-swarm.yml spark
Once everything is up and running on the swarm, simply point your development computers browser at
http://your-cluster-IP-address:7777 to get the Jupyter notebook UI. While you cluster is running a spark job or Jupyter notebook, you can point your browser at
http://your-cluster-IP-address:4040 to watch the progress of the Spark cluster.
This example notebook can be used to quickly test your deployment. It simply calculates all of the prime numbers up to a configurable max value, saves the resulting data frame to QFS then loads it back into another data frame, and then calculates some aggregate statistics on the found prime numbers. You can try adjusting the
MAX_VALUE variable to calculate larger ranges of prime numbers.
It’s interesting to note that when I first ran this notebook, I decided to verify my results with Wolfram Alpha. I used a
MAX_VALUE of 1000000000, and then asked Wolfram Alpha the count of prime number less than 1000000000. My number and their number differed. After some digging, I became convinced it was a bug in Wolfram Alpha, reported it, and they concurred. So at least in this case, my Personal Compute Cluster was able to out perform Wolfram Alpha.
In a future post I will more aggressively put this Spark deployment through some stress tests.
Shutting Down the Spark QFS Swarm
If you want to delete the Spark QFS stack running on the Docker swarm, the following command will remove it:
docker stack rm spark
To relaunch the stack later, simply launch it with the
docker stack deploy command provided int eh section above.