Streaming processing is a set of techniques used to extract information from unbounded data (a type of dataset theoretically infinite in size)
Some examples of streaming are device monitoring, fault detection, billing modernization, fleet management, media recommendations, faster loans… In all these cases, we can imagine that data should be consumed as fresh as possible.
Introducing Apache Spark
Apache Spark is fast, reliable and fault-tolerant distributed computing framework for large-scale data processing. Spark memory model uses RAM to cache data as it is being processed, so it can be 100 times faster than Hadoop MapReduce.
Its core is Resilient Distributed Dataset. RDD have a rich functional programming model that abstracted the complexities of distributed computing on a cluster. It offers a better programming model than Map Reduce. We have 2 types of operations in Spark: transformation and action. The transformations (for example map, flatmap, join, filter) are lazy evaluated, it means that the execution will not start immediately, whereas actions (for example count, collect, reduce) are eager operations computed on the distributed system to produce the result.
Spark Components
Spark Core
Contains the Spark core execution engine and a set of low-level functional API used to distribute computations to a cluster of computing resources.
Spark SQL
It Is a module for working with structed data. It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it.
The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming. In this article, we will focus on Structured Streaming.
Structured Streaming Processing
This feature was first introduced in Spark 2.0 in July 2016. It is built on top of Spark SQL abstraction. Structured Streaming enriches Dataset and DataFrame APIs with streaming capabilities. It requires the specification of a schema for the data in the stream.
Basic Concept of Streaming processing
Consider that all data arriving are treated as an unbounded input table. Every data item is like a new row being appended to the input Table.
A query on the input will generate the “Result Table”. Each time a trigger fires, Spark checks for new data (new row in the input table), and incrementally updates the result.
The last part of the model is output modes. Each time the result table is updated, we want to write the changes to an external system, such as S3, HDFS or a database. We usually want to write output incrementally. For this purpose, Structured Streaming provides three output modes:
- Append: Only the new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only on queries where existing rows in the result table cannot change (e.g. a map on an input stream).
- Complete: The entire updated result table will be written to external storage.
- Update: Only the rows that were updated in the result table since the last trigger will be changed in the external storage. This mode works for output sinks that can be updated in place, such as a MySQL table.
Let’s see an example: how can we run our mobile monitoring application in this model. The query is designed to compute a count of actions grouped by (action, hour). When this query is started, spark will continuously check for new data. If there are new ones, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts as shown below.
Example with pySpark
Here I created an account on Databricks to run the example. We have some sample data files in /databricks-datasets/structured-streaming/events. Look at the content of directory
%fs ls /databricks-datasets/structured-streaming/events/
There are about 50 json files in the directory. Let’s see what each json file contains
%fs head /databricks-datasets/structured-streaming/events/file-0.json
{“time”:1469501219,”action”:”Open”} {“time”:1469501225,”action”:”Open”} {“time”:1469501234,”action”:”Open”} {“time”:1469501245,”action”:”Open”} {“time”:1469501246,”action”:”Open”} {“time”:1469501248,”action”:”Open”} {“time”:1469501256,”action”:”Open”} {“time”:1469501264,”action”:”Open”} {“time”:1469501266,”action”:”Open”} {“time”:1469501267,”action”:”Open”} {“time”:1469501269,”action”:”Open”} {“time”:1469501271,”action”:”Open”} {“time”:1469501282,”action”:”Open”} {“time”:1469501285,”action”:”Open”} {“time”:1469501291,”action”:”Open”} {“time”:1469501297,”action”:”Open”} {“time”:1469501303,”action”:”Open”} {“time”:1469501322,”action”:”Open”} {“time”:1469501335,”action”:”Open”} {“time”:1469501344,”action”:”Open”} {“time”:1469501346,”action”:”Open”} {“time”:1469501349,”action”:”Open”} {“time”:1469501357,”action”:”Open”} {“time”:1469501366,”action”:”Open”} {“time”:1469501371,”action”:”Open”} {“time”:1469501375,”action”:”Open”} {“time”:1469501375,”action”:”Open”} {“time”:1469501381,”action”:”Open”} {“time”:1469501392,”action”:”Open”} {“time”:1469501402,”action”:”Open”} {“time”:1469501407,”action”:”Open”} {“time”:1469501410,”action”:”Open”} {“time”:1469501420,”action”:”Open”} {“time”:1469501424,”action”:”Open”} {“time”:1469501438,”action”:”Open”} {“time”:1469501442,”action”:”Close”} {“time”:1469501462,”action”:”Open”} {“time”:1469501480,”action”:”Open”} {“time”:1469501488,”action”:”Open”} {“time”:1469501489,”action”:”Open”} {“time”:1469501491,”action”:”Open”} {“time”:1469501503,”action”:”Open”} {“time”:1469501505,”action”:”Open”} {“time”:1469501509,”action”:”Open”}
First, we are creating a SparkSession
import org.apache.spark.sql.SparkSession spark = SparkSession .builder() .appName("StreamProcessing") .master("local[*]") .getOrCreate()
Then we define the schema for the data using the StructType object. This object contains a list of StructField objects that define name, type…
from pyspark.sql.types import * inputPath = "/databricks-datasets/structured-streaming/events/" jsonSchema = StructType([StructField("time", TimestampType(), True), StructField("action", StringType(), True)]) staticInputDF = ( spark.read .schema(jsonSchema) .json(inputPath)) display(staticInputDF) staticInputDF:pyspark.sql.dataframe.DataFrame time:timestamp action:string
Now that we have the schema of data, we can begin the stream. Because we have static files, we are going to emulate a stream by reading them one by one.
from pyspark.sql.functions import * streamingInputDF = ( spark.readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat one file at a time .json(inputPath) ) streamingCountsDF = ( streamingInputDF .groupBy( streamingInputDF .action, window(streamingInputDF.time, "1 hour")) .count() ) streamingCountsDF.isStreaming # Is this DF a streaming DF? streamingInputDF:pyspark.sql.dataframe.DataFrame time:timestamp action:string streamingCountsDF:pyspark.sql.dataframe.DataFrame action:string window:struct start:timestamp end:timestamp count:long Out[4]: True
As we can see, streamingCountsDF is a streaming DataFrame because the output is True. So, we can start streaming computation.
spark.conf.set("spark.sql.shuffle.partitions", "2") query = ( streamingCountsDF .writeStream .format("memory") .queryName("counts") .outputMode("complete").start() )
Spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregation (here is groupby) . Default value is 200, so there are too many partitions for nothing, it will degrade the performance. We choose value =2 in order to keep the size of shuffles small.
Query is a handle to the streaming query which is running in background. This query is continuously picking up files and updating the windowed counts. Databricks gives us a simple way to see the status of any streaming query. Below is ours.
Let’s explore what are these metrics and why they’re important for us to understand.
Input Rate and Processing Rate
The input rate shows how much data is flowing into Structured Streaming from a system like Kafka, Twitter or source data. The processing rate is how quickly we were able to analyze this data.
Batch Duration
Structured Streaming achieves both options: low latency and high throughput. You see it oscillates as Structured Streaming processes varying numbers of events over time. Our batch duration is oscillating consistently around three seconds because we use the single core cluster on Community Edition. Larger cluster will have much faster processing rates and shorted batch duration.
Aggregation State
Sliding event-time window is 1 hour, so aggregate values are maintained for each 1 hour. Output mode used is “complete” that does not drop old aggregation state because this mode preserves all data in the Result Table.
So now we can query on the counts table
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
Summary
Structured Streaming is much simpler model for building real time application. It provides fast, scalable, fault-tolerant, end-to-end exactly one streaming processing. It is based on Dataframe and Dataset APIs so we can easily apply SQL queries on streaming data. To learn more about Structured Streaming, we have a few useful links in references.
References
- [1] Karau, Holden, Andy Konwinski, Patrick Wendell, and Matei Zaharia. Learning Spark. O’Reilly, 2015.
- [2] https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model
- [3] https://docs.databricks.com/spark/latest/structured-streaming/index.html