9923170071 / 8108094992 info@dimensionless.in

Introduction

In a world where we generate data at an extremely fast rate, the correct analysis of the data and providing useful and meaningful results at the right time can provide helpful solutions for many domains dealing with data products. We can apply this in Health Care and Finance to Media, Retail, Travel Services and etc. some solid examples include Netflix providing personalized recommendations at real-time, Amazon tracking your interaction with different products on its platform and providing related products immediately, or any business that needs to stream a large amount of data at real-time and implement different analysis on it.

One of the amazing frameworks that can handle big data in real-time and perform different analysis, is Apache Spark. In this blog, we are going to use spark streaming to process high-velocity data at scale. We will be using Kafka to ingest data into our Spark code

What is Spark?

Apache Spark is a lightning-fast cluster computing technology, designed for fast computation. It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application.

Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries and streaming. Apart from supporting all these workloads in a respective system, it reduces the management burden of maintaining separate tools.

What is Spark Streaming?

Spark Streaming is an extension of the core Spark API that enables high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ or TCP sockets and processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to file systems, databases, and live dashboards. Since Spark Streaming is built on top of Spark, users can apply Spark’s in-built machine learning algorithms (MLlib), and graph processing algorithms (GraphX) on data streams. Compared to other streaming projects, Spark Streaming has the following features and benefits:

  • Ease of Use: Spark Streaming brings Spark’s language-integrated API to stream processing, letting users write streaming applications the same way as batch jobs, in Java, Python and Scala.
  • Fault Tolerance: Spark Streaming is able to detect and recover from data loss mid-stream due to node or process failure.

How Does Spark Streaming Work?

Spark Streaming processes a continuous stream of data by dividing the stream into micro-batches called a Discretized Stream or DStream. DStream is an API provided by Spark Streaming that creates and processes micro-batches. DStream is nothing but a sequence of RDDs processed on Spark’s core execution engine like any other RDD. It can be created from any streaming source such as Flume or Kafka.

Difference Between Spark Streaming and Spark Structured Streaming

Spark Streaming is based on DStream. A DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset. Spark Streaming has the following problems.

Difficult — it was not simple to built streaming pipelines supporting delivery policies: exactly once guarantee, handling data arrival in late or fault tolerance. Sure, all of them were implementable but they needed some extra work from the part of programmers.

Inconsistent — API used to generate batch processing (RDD, Dataset) was different than the API of streaming processing (DStream). Sure, nothing blocker to code but it’s always simpler (maintenance cost especially) to deal with at least abstractions as possible.

Spark Structured Streaming be understood as an unbounded table, growing with new incoming data, i.e. can be thought as stream processing built on Spark SQL.

More concretely, structured streaming brought some new concepts to Spark.

Exactly-once guarantee — structured streaming focuses on that concept. It means that data is processed only once and output doesn’t contain duplicates.

Event time — one of the observed problems with DStream streaming was processing order, i.e the case when data generated earlier was processed after later generated data. Structured streaming handles this problem with a concept called event time that, under some conditions, allows to correctly aggregate late data in processing pipelines.

sink, Result Table, output mode and watermark are other features of spark structured-streaming.

Implementation Goal

In this blog, we will try to find the word count present in the sentences. The major point here will be that this time sentences will not be present in a text file. Sentences will come through a live stream as flowing data points. We will be counting the words present in the flowing data. Data, in this case, is not stationary but constantly moving. It is also known as high-velocity data. We will be calculating word count on the fly in this case! We will be using Kafka to move data as a live stream. Spark has different connectors available to connect with data streams like Kafka

Word Count Example Using Kafka

There are few steps which we need to perform in order to find word count from data flowing in through Kafka.

The initialization of Spark and Kafka Connector

Our main task is to create an entry point for our application. We also need to set up and initialise Spark Streaming in the environment. This is done through the following code

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

Since we have Spark Streaming initialised, we need to connect our application with Kafka to receive the flowing data. Spark has inbuilt connectors available to connect your application with different messaging queues. We need to put information here like a topic name from where we want to consume data. We need to define bootstrap servers where our Kafka topic resides. Once we provide all the required information, we will establish a connection to Kafka using the createDirectStream function. You can find the implementation below

 

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
Using Map and Reduce to get the word count

Now, we need to process the sentences. We need to map through all the sentences as and when we receive them through Kafka. Upon receiving them, we will split the sentences into the words by using the split function. Now we need to calculate the word count. We can do this by using the map and reduce function available with Spark. For every word, we will create a key containing index as word and it’s value as 1. The key will look something like this <’word’, 1>. After that, we will group all the tuples using the common key and sum up all the values present for the given key. This will, in turn, return us the word count for a given specific word. You can have a look at the implementation for the same below

val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

Finally, the processing will not start unless you invoke the start function with the spark streaming instance. Also, remember that you need to wait for the shutdown command and keep your code running to receive data through live stream. For this, we use the awaitTermination method. You can implement the above logic through the following two lines

ssc.start()
ssc.awaitTermination()
Full Code
package org.apache.spark.examples.streaming
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._

object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <groupId> is a consumer group name to consume from topics
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()

val Array(brokers, groupId, topics) = args

// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println

Summary

Earlier, as Hadoop have high latency that is not right for near real-time processing needs. In most cases, we use Hadoop for batch processing while used Storm for stream processing. It leads to an increase in code size, a number of bugs to fix, development effort, and causes other issues, which makes the difference between Big data Hadoop and Apache Spark.

Ultimately, Spark Streaming fixed all those issues. It provides the scalable, efficient, resilient, and integrated system. This model offers both execution and unified programming for batch and streaming. Although there is a major reason for its rapid adoption, is the unification of distinct data processing capabilities. It becomes a hot cake for developers to use a single framework to attain all the processing needs. In addition, through Spark SQL streaming data can combine with static data sources.

Follow this link, if you are looking to learn more about data science online!

You can follow this link for our Big Data course!

Additionally, if you are having an interest in learning Data Science, click here to start Best Online Data Science Courses 

Furthermore, if you want to read more about data science, you can read our blogs here

How to Install and Run Hadoop on Windows for Beginners

What is Data Lake and How to Improve Data Lake Quality