Using Custom Hive UDFs With PySpark

Using Python to develop on Apache Spark is easy and familiar for many developers. However, due to the fact that Spark runs in a JVM, when your Python code interacts with the underlying Spark system, there can be an expensive process of data serialization and deserialization between the JVM and the Python interpreter. If you do most of your data manipulation using data frames in PySpark, you generally avoid this serialization cost because the Python code ends up being more of a high-level coordinator of the data frame operations rather than doing low-level operations on the data itself. This changes if you ever write a UDF in Python.

To avoid the JVM-to-Python data serialization costs, you can use a Hive UDF written in Java. Creating a Hive UDF and then using it within PySpark can be a bit circuitous, but it does speed up your PySpark data frame flows if they are using Python UDFs. To illustrate this, I will rework the flow I created in my last post on average airline flight delays to transform a Python UDF to a Hive UDF written in Java.

Setting Up Hive UDF Project Using SBT

To use your Java-based Hive UDFs within PySpark, you need to first package them in a jar file which is given to PySpark when it is launched. To do this, I used SBT as my Java build tool. To install SBT onto the master node, you will need to follow the instructions given at the SBT site, which outlines how to use apt-get to install SBT onto an Ubuntu distribution. One SBT is installed, create a folder structure in the hduser home as follows:

mkdir -p udf-development/src/main/java/net/diybigdata/udf/
mkdir -p udf-development/src/test/java/net/diybigdata/udf/
mkdir -p udf-development/project
mkdir -p udf-development/target

Adjust the path for the first folder ending in udf as you wish according to the Java package name you will want to use. Now, create the file that will tell SBT how to build your UDF jar.

cd udf-development
vi build.sbt

Set the build.sbt file contents to:

name := "diybigdata-udf"

// orgnization name (e.g., the package name of the project)
organization := "net.diybigdata"

version := "1.0-SNAPSHOT"

// project description
description := "DIY Big Data Hive UDFs"

// Enables publishing to maven repo
publishMavenStyle := true

// Do not append Scala versions to the generated artifacts
crossPaths := false

// This forbids including Scala related libraries into the dependency
autoScalaLibrary := false

// Use the latest Scala version with Spark 2+
scalaVersion := "2.11.6"
scalacOptions ++= Seq("-unchecked", "-feature", "-deprecation")

// Add repositories where library dependencies can be found
resolvers += "Cloudera" at "https://repository.cloudera.com/content/repositories/releases/"
resolvers += "Central" at "http://central.maven.org/maven2/"
resolvers += "Spring Plugins" at "http://repo.spring.io/plugins-release/"

// library dependencies. (orginization name) % (project name) % (version)
libraryDependencies ++= Seq(
  "org.apache.hive" % "hive-exec" % "2.1.0" % "provided",
  "org.apache.hadoop" % "hadoop-core" % "2.6.0-mr1-cdh5.8.2",
  "com.novocode" % "junit-interface" % "0.11" % "test"
)

Creating a Hive UDF with Java

As we discussed, we want to replace the Python UDF which converts the year & month integers into a string with a Java-based Hive UDF. Admittedly, this is a bit of a contrived example because the Spark data frames API already has a format_string function and in reality I probably should have used that rather than the Python UDF I did write. None the less, this is just a simple example to demonstrate how to create and use custom Hive UDFs in PySpark.

First, create the UDF’s Java file:

vi src/main/java/net/diybigdata/udf/FormatYearMonthString.java

Then set the file contents to:

package net.diybigdata.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;

@Description(
	name = "FormatYearMonthString",
    value = "_FUNC_(InputDataType) - Converts the passed year and month integers to a formatted string.",
    extended = "Example:\n"
             + "  > SELECT _FUNC_(InputDataType) FROM tablename;")

public class FormatYearMonthString extends UDF {
	public String evaluate( Integer year, Integer month ) {
		return String.format("%1$d-%2$02d", year, month );
	}
}

Its always good practice to create unit tests with your code, so open the following unit test for editing

vi src/test/java/net/diybigdata/udf/FormatYearMonthString_T.java

And set the unit test file contents to:

import static org.junit.Assert.assertEquals;
import org.junit.Test;

import net.diybigdata.udf.FormatYearMonthString;

public class FormatYearMonthString_T {

	@Test
	public void testStringFormating() {
		FormatYearMonthString udf = new FormatYearMonthString();
	
		assertEquals(
			"evaluate(1936, 12)",
			"1936-12", 
			udf.evaluate( 1936, 12 )
		);
		assertEquals(
			"evaluate(1980, 07)",
			"1980-07", 
			udf.evaluate( 1980, 07 )
		);	
	}
}

To run the unit tests on the UDF, enter the following command from the udf-development directory:

sbt test

SBT will first determine what library dependencies are required to build the UDF code, and download any library artifacts that are needed from the remote repositories. This may take a few minutes, but the good news is that it only happens once. When done with the library downloads, SBT will then compile the UDF and unit test code, then run the unit tests. You should see a message indicting that all unit tests have passed. At this point, enter the command to build the UDF jar file.

sbt package

If your build.sbt file is the same as above, the jar file will be at target/diybigdata-udf-1.0-SNAPSHOT.jar.

Launching Jupyter and PySpark to use Hive UDF Jar

In order to launch Jupyter and PySpark to use the UDF jar, update your launch command to include the --jars option as follows:

SPARK_HOME="/usr/local/spark-qfs/" XDG_RUNTIME_DIR="/data/jupyter" PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=7777 --notebook-dir=/home/hduser/notebooks" /usr/local/spark-qfs/bin/pyspark --master spark://master:7077 --jars /home/hduser/odroid-xu4-cluster/spark/udf-development/target/diybigdata-udf-1.0-SNAPSHOT.jar

Alter the path to the UDF jar as appropriate for your file locations. You might want to make the above command an alias in your shell.

Loading and Using a Custom Hive UDF in PySpark

One of the challenges with using a Hive UDF in PySpark is that there is no Python API that allows you to interact directly with the UDF. You can only interact with the UDF through SQL. Given that, your code will have to cleverly navigate between both the Spark Python API and the Spark SQL when operating on data frames. This isn’t hard, but can be annoying if you value consistently written code.

Let’s consider the original Python code from the last post for which we want to swap the Python UDF for the Hive UDF:

from pyspark.sql.functions import avg, udf, col
from pyspark.sql.types import StringType

def getYearMonthStr(year, month):
    return '%d-%02d'%(year,month)

udfGetYearMonthStr = udf(getYearMonthStr, StringType())

airline_delay = air_data.select(
    'Carrier',
    'Year',
    'Month',
    'ArrDelay'
).filter(
    col('Carrier').isin('AA','WN','DL','UA','MQ','EV','AS','VX')
).groupBy(
    'Carrier',
    'Year',
    'Month'
).agg(
    avg(col('ArrDelay')).alias('average_delay')
).withColumn(
    'YearMonth', udfGetYearMonthStr('Year','Month')
).drop(
    'Year'
).drop(
    'Month'
).orderBy(
    'YearMonth','Carrier'
).cache()

To use the FormatYearMonthString Hive UDF instead of the udfGetYearMonthStr Python UDF, the code would have to be refactored to:

from pyspark.sql.functions import avg, col
spark.sql("CREATE TEMPORARY FUNCTION FormatYearMonthString AS 'net.diybigdata.udf.FormatYearMonthString'")

air_data_filtered = air_data.select(
    'Carrier',
    'Year',
    'Month',
    'ArrDelay'
).filter(
    col('Carrier').isin('AA','WN','DL','UA','MQ','EV','AS','VX')
).groupBy(
    'Carrier',
    'Year',
    'Month'
).agg(
    avg(col('ArrDelay')).alias('average_delay')
)

air_data_filtered.registerTempTable('air_data_filtered')
airline_delay = spark.sql( """
    SELECT
        Carrier,
        FormatYearMonthString( Year, Month ) AS YearMonth,
        average_delay
    FROM air_data_filtered
    ORDER BY YearMonth, Carrier
    """).cache()

There are a couple things to note about this code. First, you need to register the Java UDF function as a UDF via a Spark SQL statement. There is no PySpark API to directly register a Hive UDF. It’s worth noting that if you ever try to register a given UDF more than once, Spark will error out on you. This can be an issue if you are in a notebook environment like Jupyter and are iteratively working on various cells. Given that, a best practice is to place the UDF loading SQL calls into their own cell. Second, note how the data frame manipulation had to be broken up to use Spark SQL for the specific part of the data transformation that needed to use the UDF. This is an example of how you might need to swap between the PySpark data frames API and Spark SQL in order to use the Hive UDFs.

All this extra work does have a benefit: to speed up processing. On my 5-node ODROID XU4 cluster, the above step took 4.8 minutes with the original Python UDF, and 4.1 minutes with the Hive UDF written in Java. Clearly, the need to serialize and deserialize data between the JVM and the Python environment has a cost. If you insist on using PySpark (like me), learning how to write Hive UDFs in Java can greatly help you improve the performance of your data frame manipulations.

You can review the completed notebook for the analysis discussed above at my Github repository. If you are interested in more information about performance and PySpark, here is a great presentation from Spark Summit 2016.

Leave a Reply