One of the analysis I am looking to do with my ODROID XU4 cluster is to take a look at various access patterns on my website by analyzing the Apache http access logs. Analyzing Apache access logs directly in Spark can be slow due to them being unstructured text logs. Converting to the logs to a data frame backed by partitioned parquet files can make subsequent analysis much faster.
The first task is to create a mapper that can be used in Spark convert a row int eh access log to a Spark Row
object. A Python 3 mapper would look like:
# Parse an Apache access log. Assumes Python 3 import re from pyspark.sql import Row from datetime import datetime APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+) "((?:[^”]|”)+)" "((?:[^”]|”)+)"$' DATETIME_PARSE_PATTERN = '%d/%b/%Y:%H:%M:%S %z' # Returns a Row containing the Apache Access Log info def parse_apache_log_line(logline): match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) if match is None: return None date_obj = datetime.strptime(match.group(4),DATETIME_PARSE_PATTERN) return Row( ipAddress = match.group(1), clientIdentd = match.group(2), userId = match.group(3), dateTime = match.group(4), timestamp = date_obj.timestamp(), date = date_obj.strftime('%Y-%m-%d'), method = match.group(5), endpoint = match.group(6), protocol = match.group(7), referrer = match.group(10), userAgent = match.group(11), responseCode = int(match.group(8)), contentSize = int(match.group(9)))
Note that the date
field will be used for partitioning when constructing the parquet files. If your website does not have too many access events per day, you might consider changing the partitioning row to a month or week value so that you don’t have too many small partitions.
Then, to convert the raw access logs to a partitioned parquet file, in PySpark you would do something along the lines of:
access_logs_raw = sc.textFile("hdfs://master:9000/user/michael/data/claireware.2014-2016.log") access_logs_df = access_logs_raw.map(parse_apache_log_line).filter(lambda x: x is not None).toDF() access_logs_df.write.partitionBy("date").parquet( "hdfs://master:9000/user/michael/data/claireware.2014-2016.parquet", mode='overwrite' )
Of course, change the file paths to whatever is relevant for your case. To later load the parquet file into a data frame for analysis:
access_logs_df = sqlContext.read.parquet("hdfs://master:9000/user/michael/data/claireware.2014-2016.parquet")
You can add a filter function based on the partition to the read operation to speed uploading by selecting only those records you are interested in. For example, filter(col("date") > "2016-01-01")
.