How to solve the problem of querying the continuously flowing data stream?

Siddharth Garg
5 min readDec 15, 2020

--

This is Siddharth Garg having around 6+ years of experience in Big Data Technologies like Map Reduce, Hive, Hbase, Sqoop, Oozie, Flume, Airflow, Phoenix, Spark, Scala, and Python. For the last 1.5+ years, I am working with Luxoft as Software Development Engineer 1(Big Data).

To solve this problem, we have Spark Structure Streaming where each row of the data stream is processed and the result is updated into the unbounded result table.

Structured streaming is a processing engine built on top of the Spark SQL engine and uses Spark SQL APIs. It is quick, scalable, and tolerant of mistakes. It offers rich, integrated, and high-quality APIs in the form of DataFrame and DataSets that allow us to deal with complex data and complex variations of workloads. Like Spark Batch Run, it also has a rich ecosystem of data sources where it can read from and write to.

Idea

The idea behind the development of structured streaming is that,

“We as the end-user should have no reason to streaming”.

That means we as the end user should only write a batch like queries and its Spark function to find out how to use it in continuous data streaming and continuously update the result as new data flows.

In the background

The thinking/awareness of the Organized Broadcast Engineers that had and led to its growth,

“We can always treat the stream of data as an unlimited table”.

This means that all records in the data stream are like a line drawn in a table.

Therefore, we can represent batch (static bounded data) and streams (unlimited data) as tables, allowing us to display our calculations with respect to tables and Spark numbers on how we can run it on any static data or streaming data.

Streaming Query Structure

To understand the structure of a Streaming query, let’s look at a simple streaming query.

(Example is taken from Databricks)

Suppose we stopped the ETL pipeline, where we get JSON data from Kafka and want to scan this data and convert it into a standard format, finally writing it into a Parquet file. Also, we want to get end-to-end guarantees as we do not want any failure to discard data or create duplicate data.

Reading data (Source)

The first step is to create a DataFrame from Kafka i.e. we need to specify where we will read the data. In this regard, we need to clarify the following:

  • Source format as “Kafka”
  • Kafka Consumer IP Address (bootstrap server)
  • Name of the subject from which the data will be used.
  • Offsets appear where we want to use the data. It can be the first, latest, or any custom offset.

There are many built-in supported sources such as File, Kafka, Kinesis. We can have multiple streams for installation and we can join or merge streams together.

spark.readStream
.format (“kafka”)
.option (“kafka.bootstrap.servers”, “…”)
.option (“subscribe”, “topicName”)
.option (“startingOffsets”, “latest”)
.load

.select ($ “value” .cast (StringType))
.select (from_json ($ “value”, schema) .as (“data”))

.writeStream
.format (“package”)
.option (“path”, “…”)
.trigger (Trigger.ProcessingTime (1.minutes))
.outputMode (OutputMode.Append)
.option (“Test location”, “…”).start ()

The lines above the code return DataFrame, which is a single integrated API for managing batch and streaming data in Spark using the same APIs.

Transformation (Business Logic)

Now, since we have data in DataFrame we can do all the DataFrame functions in Spark.

In this case, the data obtained from Kafka is in binary format, so the next step would be to convert the data to convert it from binary value to string value and filter the string value into a specific JSON format using the appropriate schema in it.

Note: In streaming, Spark cannot install a schema as each micro-batch has only a limited number of data. Therefore, inserting a schema from such a small amount of data is not a good process.

Writing Data(Defining Sink)

Now, since we have separate details in the JSON format we want to write it in the Parquet file. In this regard, we need to clarify the following:

  • Sink format as parquet
  • The way we write data.

Processing Trigger

It describes the period of time in which we want to begin this query.

Eg. If we define a processing interval as 1 minute, it means that Spark will collect all the data it receives in that one minute (known as a micro-batch) and process it in exactly 60 seconds.

Note: The default trigger processing is 0, which means Spark will trigger a query about any data collected very quickly.

Output Mode

We can define different methods of extraction according to our query such as — append, complete, and update. This alternative output makes sense with different queries. Each mode only works for specific types of queries.

i. Apply: “This is the default mode where only new lines are added to the Result Table and the last trigger is written to the sink. This only applies to those queries where the lines added to the Results Table are not expected to change. ”

Most effective in situations where we do nothing included in the query, in which case we may only insert newlines (after conversion) from each micro-batch.

ii. Complete: “The entire Updated Results Table will be written in the sink after all triggers. This is supported by a combination of queries. It is up to the storage connector to determine how to handle the writing of the entire table. ”

Most effective in situations where we are doing integration in a column, so every time there is new data it is necessary to do a merge of the data that has been worked so far.

Note: Spark does not store all streaming data in memory, but will use metadata (State) data for previous integration.

There are a few Dataframe functions that are not supported by streaming such as distinct, sorting, etc.

iii. Updates: Only the lines updated in the Result Table from the last trigger will be written to the sink. If the query does not contain merit, it will be equal to Append mode.

Therefore, the user is able to control which data should be pushed to the sink or external system as the results are updated with new details.

Check-Pointing

We need to specify the location of the check-points in order to maintain the metadata and other information needed so that any engine failure can restart with simultaneous delivery. The check-point should be error-tolerant storage such as HDFS, AWS S3 bucket, Azure Blob Storage, etc.

The last step is to call the first API ().

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Siddharth Garg
Siddharth Garg

Written by Siddharth Garg

SDE(Big Data) - 1 at Luxoft | Ex-Xebia | Ex-Impetus | Ex-Wipro | Data Engineer | Spark | Scala | Python | Hadoop | Cloud

No responses yet

Write a response