Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4956#discussion_r26089214
--- Diff: docs/streaming-kafka-integration.md ---
@@ -2,58 +2,154 @@
layout: global
title: Spark Streaming + Kafka Integration Guide
---
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging
rethought as a distributed, partitioned, replicated commit log service. Here
we explain how to configure Spark Streaming to receive data from Kafka.
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging
rethought as a distributed, partitioned, replicated commit log service. Here
we explain how to configure Spark Streaming to receive data from Kafka. There
are two approaches to this - the old approach using Receivers and Kafka's
high-level API, and a new experimental approach (introduced in Spark 1.3)
without using Receivers. They have different programming models, performance
characteristics, and semantics guarantees, so read on for more details.
-1. **Linking:** In your SBT/Maven project definition, link your streaming
application against the following artifact (see [Linking
section](streaming-programming-guide.html#linking) in the main programming
guide for further information).
+## Approach 1: Receiver-based Approach
+This approach uses a Receiver to receive the data. The Received is
implemented using the Kafka high-level consumer API. As with all receivers, the
data received from Kafka through a Receiver is stored in Spark executors, and
then jobs launched by Spark Streaming processes the data.
+
+However, under default configuration, this approach can loose data under
failures (see [receiver
reliability](streaming-programming-guide.html#receiver-reliability). To ensure
zero-data loss, you have to additionally enable Write Ahead Logs in Spark
Streaming. To ensure zero-data loss, enable the Write Ahead Logs (introduced in
Spark 1.2) . This synchronously saves all the received Kafka data into write
ahead logs on a distributed file system (e.g HDFS), so that all the data can be
recovered on failure. Ssee [Deploying
section](streaming-programming-guide.html#deploying-applications) in the
streaming programming guide for more details on Write Ahead Logs.
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** For Scala/Java applications using SBT/Maven project
definitions, link your streaming application with the following artifact (see
[Linking section](streaming-programming-guide.html#linking) in the main
programming guide for further information).
groupId = org.apache.spark
artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}
-2. **Programming:** In the streaming application code, import `KafkaUtils`
and create input DStream as follows.
+ For Python applications, you will have to add this above library and
its dependencies when deploying your application. See the *Deploying*
subsection below.
+
+2. **Programming:** In the streaming application code, import `KafkaUtils`
and create an input DStream as follows.
<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.kafka._
- val kafkaStream = KafkaUtils.createStream(
- streamingContext, [zookeeperQuorum], [group id of the
consumer], [per-topic number of Kafka partitions to consume])
+ val kafkaStream = KafkaUtils.createStream(streamingContext,
+ [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])
- See the [API
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+ You can also specify the key and value classes and their corresponding
decoder classes using variations of `createStream`. See the [API
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.kafka.*;
- JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(
- streamingContext, [zookeeperQuorum], [group id of the
consumer], [per-topic number of Kafka partitions to consume]);
+ JavaPairReceiverInputDStream<String, String> kafkaStream =
+ KafkaUtils.createStream(streamingContext,
+ [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume]);
- See the [API
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+ You can also specify the key and value classes and their corresponding
decoder classes using variations of `createStream`. See the [API
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+
+ </div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.kafka import KafkaUtils
+
+ kafkaStream = KafkaUtils.createStream(streamingContext, \
+ [ZK quorum], [consumer group id], [per-topic number of
Kafka partitions to consume])
+
+ By default, the Python API will decode Kafka data as UTF8 encoded
strings. You can specify your custom decoding function to decode the byte
arrays in Kafka records to any arbitrary data type. See the [API
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+ and the
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
</div>
</div>
- *Points to remember:*
+ **Points to remember:**
- Topic partitions in Kafka does not correlate to partitions of RDDs
generated in Spark Streaming. So increasing the number of topic-specific
partitions in the `KafkaUtils.createStream()` only increases the number of
threads using which topics that are consumed within a single receiver. It does
not increase the parallelism of Spark in processing the data. Refer to the main
document for more information on that.
- Multiple Kafka input DStreams can be created with different groups
and topics for parallel receiving of data using multiple receivers.
-3. **Deploying:** Package
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies
(except `spark-core_{{site.SCALA_BINARY_VERSION}}` and
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by
`spark-submit`) into the application JAR. Then use `spark-submit` to launch
your application (see [Deploying
section](streaming-programming-guide.html#deploying-applications) in the main
programming guide).
-
-Note that the Kafka receiver used by default is an
-[*unreliable*
receiver](streaming-programming-guide.html#receiver-reliability) section in the
-programming guide). In Spark 1.2, we have added an experimental *reliable*
Kafka receiver that
-provides stronger
-[fault-tolerance
guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
-data loss on failures. This receiver is automatically used when the write
ahead log
-(also introduced in Spark 1.2) is enabled
-(see [Deployment](#deploying-applications.html) section in the programming
guide). This
-may reduce the receiving throughput of individual Kafka receivers compared
to the unreliable
-receivers, but this can be corrected by running
-[more receivers in
parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
-to increase aggregate throughput. Additionally, it is recommended that the
replication of the
-received data within Spark be disabled when the write ahead log is enabled
as the log is already stored
-in a replicated storage system. This can be done by setting the storage
level for the input
-stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+ - If you have enabled Write Ahead Logs with a replicated file system
like HDFS, the received data is already being replicated in the log. Hence, the
storage level in storage level for the input stream to
`StorageLevel.MEMORY_AND_DISK_SER` (that is, use
`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
+asdasd
+
+3. **Deploying:** As with any Spark applications, `spark-submit` is used
to launch your application. However, the details are slightly different for
Scala/Java applications and Python applications.
+
+ For Scala and Java applications, if you are SBT or Maven for project
management, then package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}`
and its dependencies into the application JAR. Make sure
`spark-core_{{site.SCALA_BINARY_VERSION}}` and
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided`
dependencies as those are already present in a Spark installation. Then use
`spark-submit` to launch your application (see [Deploying
section](streaming-programming-guide.html#deploying-applications) in the main
programming guide).
+
+ For Python applications which lack SBT/Maven project management,
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies can
be directly added to `spark-submit` using `--packages` (see [Application
Submission Guide](submitting-applications.html)). That is,
+
+ ./bin/spark-submit --packages
org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
...
+
+ Alternatively, you can also download the JAR of the Maven artifact
`spark-streaming-kafka-assembly` from the
+ [Maven
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
and add it to `spark-submit` with `--jars`.
+
+## Approach 2: Direct Approach (No Receivers)
+This is a new receiver-less "direct" approach has been introduced in Spark
1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to
receive data, this approach periodically queries Kafka for the latest offsets
in each topic+partition, and accordingly defines the offset ranges to process
in each batch. When the jobs to process the data is launched, Kafka's simple
consumer API is used to read the defined ranges of offsets from Kafka (similar
to read files from a file system). Note that this is an experimental feature in
Spark 1.3 and is only available in the
+
+This approach has the following advantages over the first approach.
--- End diff --
In case these sections are read out-of-order or linked directly, it might
be better to be more explicit, e.g.
> This approach has the following advantages over the receiver-based
approach
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]