Now that my Personal Compute Cluster is uninhibited by CPU overheating, I want to turn my configuration to work as efficiently as possible for the type of workloads I place on it. I searched around for Apache Spark benchmarking software, however most of what I found was either too older (circa Spark 1.x) or too arcane. I was able to get Ewan Higgs’s implementation of TeraSort working on my cluster, but it was written in Scala and not necessarily representative of the type of operations I would use in PySpark. So I set out to write my own.
The primary goal of my benchmarking approach is to have a standard set of data and operations that I can compare the performance of before and after some change I make to my Spark deployment and be confident that any change in performance was due to the change in the Spark deployment and not due to variability in the benchmark. This attribute is important is it allows me to make informed decisions about whether my cluster’s updated configuration is benefiting performance. The second goal was that running the benchmark should be easy and the insights it provides be readily accessible.
The code I put together for this benchmark can be viewed in my GitHub repository. You will note that the code is designed to be run through
spark-submit rather as a Jupyter notebook. I will discuss below how to run this sort of job with the Docker Swarm based cluster I describe in a previous post.
Test Data Generation
The first step in running a benchmark is to create a sufficiently large data set to be used the benchmarking runs. While there are certainly large datasets that could be used, a desired quality of a test data set is that the distributions of the values in the data set be uniformly random. The test data generation PySpark job I wrote uses randomly generated UUIDs to create a data set of the desired qualities. The schema of the data file produced is:
root |-- value: string (nullable = true) |-- prefix2: string (nullable = true) |-- prefix4: string (nullable = true) |-- prefix8: string (nullable = true) |-- float_val: double (nullable = true) |-- integer_val: long (nullable = true)
value columns is where the UUID is placed, while the prefix fields are various sized substrings from the first N characters of the UUID. The
integer_val fields are floating point and integer representations of the same random number. The data files produced will be CSV files, so the following data frame schema will need to be used if you ever want to read the file in your code:
test_data_schema = T.StructType([ T.StructField("value", T.StringType()), T.StructField("prefix2", T.StringType()), T.StructField("prefix4", T.StringType()), T.StructField("prefix8", T.StringType()), T.StructField("float_val", T.DoubleType()), T.StructField("integer_val", T.LongType()) ])
The generation job has options to control the number of rows and partitions in the data file.
Shuffling in Apache Spark is when data needs to be moved in between nodes via the inter-node networking in order to transform the data in some way. The most common transformations that trigger shuffles are group-by operations, repartitioning, and joins. The shuffle benchmark job exercises each of these operations one by one.
The CPU benchmark is designed to perform tasks that are heavy on the CPU and do not require much data input and output. The first task is to calculate the SHA-512 hash on the value field on each row of the input test data file. In this task this is some file activity caused by reading the input file from disk, but if all the Spark tasks get schedule as
NODE_LOCAL priority, then the network activity should be minimal.
There are also two versions of an algorithm for calculating Pi. The first version is your traditional PySpark demonstration code that uses a Python function to test whether randomly generated points are within a circle or not. It is in fact based on the example code that is given by the Apache Spark project itself:
def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 count = ( spark.sparkContext\ .parallelize(range(0, NUM_SAMPLES), NUM_PARTITIONS)\ .filter(inside)\ .count() print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
The issue with this implementation of calculating Pi is that the data computations and comparisons are done in a Python function. This means they are done in the Python interpreter rather than the JVM where Spark’s core engine runs. The overhead of doing computations in Python and serializing the results back to Spark’s Enginer in the JVM can cause many operations in PySpark to be slow.
To emphasize that slowness, this benchmark also calculates Pi with the same general mathematical approach but in such a way where the computations are done completely in Spark’s engine where spark can optimize its operations. This is done by converting the generation and testing of data points to their equivalent data frame manipulations. That code looks like this:
pi_df = ( spark.range(0, NUM_SAMPLES, numPartitions=NUM_PARTITIONS) .withColumn('x', F.rand(seed=8675309)) .withColumn('y', F.rand(seed=17760704)) .withColumn('within_circle', F.when( (F.pow(F.col('x'),F.lit(2)) + F.pow(F.col('y'),F.lit(2)) <= 1.0), F.lit(1).cast(T.LongType()) ).otherwise( F.lit(0).cast(T.LongType()) ) ) .agg( F.sum('within_circle').alias('count_within_circle'), F.count('*').alias('count_samples') ) ) res = pi_df.collect() pi_val = 4.0*(res.count_within_circle)/(res.count_samples)
Note that this code uses Spark’s native random number generator, and that each of the
y column has a different seed to ensure different values in each column. Also note that every other aspect of th Pi calculation is a Spark native data frame function, with the exception of the final formula. This approach to calculating Pi will be much faster than using the Python function. In fact, on my cluster, this approach is over 50 times faster with using 5 billion samples and a parallelism of 1000 tasks.
Running PySpark Benchmark via Docker
In oder to run the benchmark jobs on my cluster where I used Docker Swarm to deploy Apache Spark, we need to create a docker container that has access to the the benchmark code and is attached to the swarm network that the Spark cluster runs in. Here we can re-use the QFS master image, making this all a rather straightforward task.
Th first set would be to fetch the benchmark code from the Github repository. I did this on the cluster’s master node:
cd ~/repositories git clone https://github.com/DIYBigData/pyspark-benchmark.git
The next step is to start a docker container that has both the Spark and QFS software installed, is attached to the right swarm network, mounts the benchmark repository directory, and launches into a shell prompt. The docker command to do that is:
docker run -it \ --network spark_cluster_network \ -p 4050:4040 \ --mount 'type=bind,src=/home/michael/repositories/pyspark-benchmark,dst=/home/spark/jobs/' \ qfs-master /bin/bash
Of course, update the local file path in the mount’s
src as needed. Note that here we map the external port 4050 to the container port 4040. This is so the Spark status page of this container’s jobs can be viewed without conflicting with the status page of the Spark stack we already have deployed to the swarm.
Once at the command prompt within the docker container, we can start running the various benchmark jobs with
spark-submit. Note that how I currently have my Spark cluster configured, I can only run one Spark application at a time, This means that I need to terminate any Jupiter notebooks I may also have running on the cluster.
The first job that needs to be run is the test data generator. To create a data set that is about 140 GB in size, I used 2 billion rows and 1000 partitions. The
spark-submit command to generate that data is:
spark-submit \ --master spark://spark-master:7077 \ --name 'generate-benchmark-test-data' \ jobs/generate-data.py qfs:///benchmark/data -r 2000000000 -p 1000
At this point you should see the job log being printed to the to the shell giving you the status of the job execution. Since the Spark client is running in this docker container, you won’t have access to the Spark job status web page at port 4040 since that port has been mapped to the Jupiter server’s container on the cluster’s Swarm. However, if you want to see it, you can map the public port of 4041 to the internal port of 4040 when issuing the
docker run command above. Also, if you are running a cluster that does not use QFS for its file system, you will need to change the output file path in the these
spark-submit commands to use what ever filesystem is used (e.g., HDFS).
Once the test data has been generated, the commands to actually run the benchmarks are:
spark-submit \ --master spark://spark-master:7077 \ --conf "spark.sql.broadcastTimeout=900" \ jobs/benchmark-shuffle.py \ qfs:///benchmark/data -r 250 -n 'benchmark-shuffle' spark-submit \ --master spark://spark-master:7077 \ jobs/benchmark-cpu.py \ qfs:///benchmark/data \ -s 25000000000 -p 1000 -n 'benchmark-cpu'
For the shuffle benchmark, the
-r option sets the number partitions the various tasks will ultimately transform the data into. You generally want to use a smaller number here the the number of partitions in the test data. Also note that the Spark system property
spark.sql.broadcastTimeout is set to a larger that default value. This is only needed for larger test data sets and depends on the network speed of your cluster. You can try running without setting this property, but if your job errors out with a “Could not execute broadcast” error message, try increasing the value of the
spark.sql.broadcastTimeout property when running the job.
For the CPU benchmark, the
-s option sets the number of samples that will be taken when generating Pi, and
-p sets the number of parallel tasks that will be used to do th calculations. When each job is done running, it will print the results of the benchmark to the log output.
When done with running the benchmarks, the Docker container can be exited with the
Benchmark Results and Comparisons
Generating the test data took about 37 minutes on my Personal Compute Cluster. The results for all the tests were
|Benchmark Test||Time (Seconds)|
|Shuffle – Group By||547.5|
|Shuffle – Repartition||916.3|
|Shuffle – Inner Join||1416.1|
|Shuffle – Broadcast Inner Join||1211.0|
|CPU – SHA-512||921.2|
|CPU – Calculate Pi – Python UDF||916.2|
|CPU – Calculate Pi – Dataframe Functions||10.8|
I was also able to run these benchmarks on AWS using
awscli to create an EMR cluster. There are many tutorials on how to do this, so I will not cover that here. Details on how to run these PySpark Benchmark jobs in AWS EMR can be found in my repository.
When I ran these tests in EMR, I tried to make everything similar to what I have in my Personal Computer Cluster. Using six
r5d.2xlarge instance types as prescribed above provides 48 vCPUs to the Spark executors and a similar amount of RAM. Using S3 as the filesystem is a pretty significant difference over my cluster’s
NODE_LOCAL configuration of QFS, so that might have impact on reading in the data. The
r5d.* class of AWS instances used the Intel Xeon Platinum 8175 CPU, which has a lower single-thread speed rating than my cluster’s Intel Xeon 2176M CPU. However, the node have a faster-than-gigabit networking connection, which I would expect to help with the shuffle cases.
The results I got with this AWS EMR configuration were:
|Benchmark Test||Time (Seconds)|
|Shuffle – Group By||373.2|
|Shuffle – Repartition||946.9|
|Shuffle – Inner Join||1179.4|
|Shuffle – Broadcast Inner Join||1050.5|
|CPU – SHA-512||1021.0|
|CPU – Calculate Pi – Python UDF||636.8|
|CPU – Calculate Pi – Dataframe Functions||9.3|
The first thing that stands out here is that my Personal Computer Cluster‘s performance is on par with a cluster in AWS composed of six
r5d.2xlarge instances not including the master. I theorize that tests that were faster in the AWS EMR cluster were due to the faster inter-node networking that AWS has. The tests that were dominated by CPU, like the SHA-512 test, are likely slower because of the slower core in the AWS instance types I chose. I found it odd that the “Calculate Pi – Python UDF” test was faster in the AWS EMR cluster given the relative speed changes on every other test. But then I realized that this test can be network bound due to the initial parallelization and broadcast of the programmatically generated source data. I may need to revisit whether this test is a good benchmark for the CPU.
I have created a lightweight PySpark benchmark I can use to test out the impact of configuration and hardware changes to my Personal Compute Cluster. I did write it in such a manner that it can be used against any Spark cluster that is set up to run PySpark with Python 3. The PySpark Benchmark code is freely available in my repository here.