logo le blog invivoo blanc

Structured Streaming in Spark

6 December 2019 | Big Data | 0 comments

Streaming processing is a set of techniques used to extract information from unbounded data (a type of dataset theoretically infinite in size)

Some examples of streaming are device monitoring, fault detection, billing modernization, fleet management, media recommendations, faster loans… In all these cases, we can imagine that data should be consumed as fresh as possible.

Introducing Apache Spark

Apache Spark is fast, reliable and fault-tolerant distributed computing framework for large-scale data processing. Spark memory model uses RAM to cache data as it is being processed, so it can be 100 times faster than Hadoop MapReduce.

Its core is Resilient Distributed Dataset. RDD have a rich functional programming model that abstracted the complexities of distributed computing on a cluster. It offers a better programming model than Map Reduce. We have 2 types of operations in Spark: transformation and action. The transformations (for example map, flatmap, join, filter) are lazy evaluated, it means that the execution will not start immediately, whereas actions (for example count, collect, reduce) are eager operations computed on the distributed system to produce the result.

Spark Components

Learning Spark. O’Reilly. 2015

Spark Core

Contains the Spark core execution engine and a set of low-level functional API used to distribute computations to a cluster of computing resources.

Spark SQL

It Is a module for working with structed data. It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it.

The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming. In this article, we will focus on Structured Streaming.

Structured Streaming Processing

This feature was first introduced in Spark 2.0 in July 2016. It is built on top of Spark SQL abstraction. Structured Streaming enriches Dataset and DataFrame APIs with streaming capabilities. It requires the specification of a schema for the data in the stream.

Basic Concept of Streaming processing

Consider that all data arriving are treated as an unbounded input table. Every data item is like a new row being appended to the input Table.

Learning Spark. O’Reilly, 2015

A query on the input will generate the “Result Table”. Each time a trigger fires, Spark checks for new data (new row in the input table), and incrementally updates the result.

The last part of the model is output modes. Each time the result table is updated, we want to write the changes to an external system, such as S3, HDFS or a database. We usually want to write output incrementally. For this purpose, Structured Streaming provides three output modes:

  • Append: Only the new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only on queries where existing rows in the result table cannot change (e.g. a map on an input stream).
  • Complete: The entire updated result table will be written to external storage.
  • Update: Only the rows that were updated in the result table since the last trigger will be changed in the external storage. This mode works for output sinks that can be updated in place, such as a MySQL table.

Let’s see an example:  how can we run our mobile monitoring application in this model. The query is designed to compute a count of actions grouped by (action, hour). When this query is started, spark will continuously check for new data. If there are new ones, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts as shown below.

Example with pySpark

Here I created an account on Databricks to run the example. We have some sample data files in /databricks-datasets/structured-streaming/events. Look at the content of directory

%fs ls /databricks-datasets/structured-streaming/events/

There are about 50 json files in the directory. Let’s see what each json file contains

%fs head /databricks-datasets/structured-streaming/events/file-0.json

{“time”:1469501219,”action”:”Open”} {“time”:1469501225,”action”:”Open”} {“time”:1469501234,”action”:”Open”} {“time”:1469501245,”action”:”Open”} {“time”:1469501246,”action”:”Open”} {“time”:1469501248,”action”:”Open”} {“time”:1469501256,”action”:”Open”} {“time”:1469501264,”action”:”Open”} {“time”:1469501266,”action”:”Open”} {“time”:1469501267,”action”:”Open”} {“time”:1469501269,”action”:”Open”} {“time”:1469501271,”action”:”Open”} {“time”:1469501282,”action”:”Open”} {“time”:1469501285,”action”:”Open”} {“time”:1469501291,”action”:”Open”} {“time”:1469501297,”action”:”Open”} {“time”:1469501303,”action”:”Open”} {“time”:1469501322,”action”:”Open”} {“time”:1469501335,”action”:”Open”} {“time”:1469501344,”action”:”Open”} {“time”:1469501346,”action”:”Open”} {“time”:1469501349,”action”:”Open”} {“time”:1469501357,”action”:”Open”} {“time”:1469501366,”action”:”Open”} {“time”:1469501371,”action”:”Open”} {“time”:1469501375,”action”:”Open”} {“time”:1469501375,”action”:”Open”} {“time”:1469501381,”action”:”Open”} {“time”:1469501392,”action”:”Open”} {“time”:1469501402,”action”:”Open”} {“time”:1469501407,”action”:”Open”} {“time”:1469501410,”action”:”Open”} {“time”:1469501420,”action”:”Open”} {“time”:1469501424,”action”:”Open”} {“time”:1469501438,”action”:”Open”} {“time”:1469501442,”action”:”Close”} {“time”:1469501462,”action”:”Open”} {“time”:1469501480,”action”:”Open”} {“time”:1469501488,”action”:”Open”} {“time”:1469501489,”action”:”Open”} {“time”:1469501491,”action”:”Open”} {“time”:1469501503,”action”:”Open”} {“time”:1469501505,”action”:”Open”} {“time”:1469501509,”action”:”Open”}

First, we are creating a SparkSession

import org.apache.spark.sql.SparkSession

spark = SparkSession
          .builder()  
          .appName("StreamProcessing") 
          .master("local[*]")  
          .getOrCreate()

Then we define the schema for the data using the StructType object. This object contains a list of StructField objects that define name, type…

from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

jsonSchema = StructType([StructField("time", TimestampType(), True), StructField("action", StringType(), True)])

staticInputDF = (
  spark.read
.schema(jsonSchema)
.json(inputPath))
display(staticInputDF)

staticInputDF:pyspark.sql.dataframe.DataFrame
time:timestamp
action:string

Now that we have the schema of data, we can begin the stream. Because we have static files, we are going to emulate a stream by reading them one by one.

from pyspark.sql.functions import *

streamingInputDF = (
  spark.readStream
.schema(jsonSchema)              # Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) # Treat one file at a time
.json(inputPath)
)

streamingCountsDF = (
  streamingInputDF
.groupBy(
    streamingInputDF
    .action, window(streamingInputDF.time, "1 hour"))
    .count()
)

streamingCountsDF.isStreaming # Is this DF a streaming DF?

streamingInputDF:pyspark.sql.dataframe.DataFrame
time:timestamp
action:string
streamingCountsDF:pyspark.sql.dataframe.DataFrame
action:string
window:struct
start:timestamp
end:timestamp
count:long
Out[4]: True

As we can see, streamingCountsDF is a streaming DataFrame because the output is True. So, we can start streaming computation.

spark.conf.set("spark.sql.shuffle.partitions", "2")

query = (
  streamingCountsDF
.writeStream
.format("memory")
.queryName("counts")
.outputMode("complete").start()
)

Spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregation (here is groupby) . Default value is 200, so there are too many partitions for nothing, it will degrade the performance. We choose value =2 in order to keep the size of shuffles small.

Query is a handle to the streaming query which is running in background. This query is continuously picking up files and updating the windowed counts. Databricks gives us a simple way to see the status of any streaming query. Below is ours.

Let’s explore what are these metrics and why they’re important for us to understand.

Input Rate and Processing Rate

The input rate shows how much data is flowing into Structured Streaming from a system like Kafka, Twitter or source data. The processing rate is how quickly we were able to analyze this data.

Batch Duration

Structured Streaming achieves both options: low latency and high throughput. You see it oscillates as Structured Streaming processes varying numbers of events over time. Our batch duration is oscillating consistently around three seconds because we use the single core cluster on Community Edition. Larger cluster will have much faster processing rates and shorted batch duration.

Aggregation State

Sliding event-time window is 1 hour, so aggregate values are maintained for each 1 hour. Output mode used is “complete” that does not drop old aggregation state because this mode preserves all data in the Result Table.

So now we can query on the counts table

%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

Summary

Structured Streaming is much simpler model for building real time application. It provides fast, scalable, fault-tolerant, end-to-end exactly one streaming processing. It is based on Dataframe and Dataset APIs so we can easily apply SQL queries on streaming data. To learn more about Structured Streaming, we have a few useful links in references.

References