Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4956#discussion_r26094467
  
    --- Diff: docs/streaming-kafka-integration.md ---
    @@ -2,58 +2,154 @@
     layout: global
     title: Spark Streaming + Kafka Integration Guide
     ---
    -[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Here 
we explain how to configure Spark Streaming to receive data from Kafka.
    +[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Here 
we explain how to configure Spark Streaming to receive data from Kafka. There 
are two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new experimental approach (introduced in Spark 1.3) 
without using Receivers. They have different programming models, performance 
characteristics, and semantics guarantees, so read on for more details.  
     
    -1. **Linking:** In your SBT/Maven project definition, link your streaming 
application against the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
    +## Approach 1: Receiver-based Approach
    +This approach uses a Receiver to receive the data. The Received is 
implemented using the Kafka high-level consumer API. As with all receivers, the 
data received from Kafka through a Receiver is stored in Spark executors, and 
then jobs launched by Spark Streaming processes the data. 
    +
    +However, under default configuration, this approach can loose data under 
failures (see [receiver 
reliability](streaming-programming-guide.html#receiver-reliability). To ensure 
zero-data loss, you have to additionally enable Write Ahead Logs in Spark 
Streaming. To ensure zero-data loss, enable the Write Ahead Logs (introduced in 
Spark 1.2) . This synchronously saves all the received Kafka data into write 
ahead logs on a distributed file system (e.g HDFS), so that all the data can be 
recovered on failure. Ssee [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the 
streaming programming guide for more details on Write Ahead Logs.
    +
    +Next, we discuss how to use this approach in your streaming application.
    +
    +1. **Linking:** For Scala/Java applications using SBT/Maven project 
definitions, link your streaming application with the following artifact (see 
[Linking section](streaming-programming-guide.html#linking) in the main 
programming guide for further information).
     
                groupId = org.apache.spark
                artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
                version = {{site.SPARK_VERSION_SHORT}}
     
    -2. **Programming:** In the streaming application code, import `KafkaUtils` 
and create input DStream as follows.
    +   For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
    +
    +2. **Programming:** In the streaming application code, import `KafkaUtils` 
and create an input DStream as follows.
     
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
                import org.apache.spark.streaming.kafka._
     
    -           val kafkaStream = KafkaUtils.createStream(
    -           streamingContext, [zookeeperQuorum], [group id of the 
consumer], [per-topic number of Kafka partitions to consume])
    +           val kafkaStream = KafkaUtils.createStream(streamingContext, 
    +            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume])
     
    -   See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
    +    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
        </div>
        <div data-lang="java" markdown="1">
                import org.apache.spark.streaming.kafka.*;
     
    -           JavaPairReceiverInputDStream<String, String> kafkaStream = 
KafkaUtils.createStream(
    -           streamingContext, [zookeeperQuorum], [group id of the 
consumer], [per-topic number of Kafka partitions to consume]);
    +           JavaPairReceiverInputDStream<String, String> kafkaStream = 
    +                   KafkaUtils.createStream(streamingContext,
    +            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume]);
     
    -   See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
    +    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
    +
    +   </div>
    +   <div data-lang="python" markdown="1">
    +           from pyspark.streaming.kafka import KafkaUtils
    +
    +           kafkaStream = KafkaUtils.createStream(streamingContext, \
    +                   [ZK quorum], [consumer group id], [per-topic number of 
Kafka partitions to consume])
    +
    +   By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
    +   and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
 
        </div>
        </div>
     
    -   *Points to remember:*
    +   **Points to remember:**
     
        - Topic partitions in Kafka does not correlate to partitions of RDDs 
generated in Spark Streaming. So increasing the number of topic-specific 
partitions in the `KafkaUtils.createStream()` only increases the number of 
threads using which topics that are consumed within a single receiver. It does 
not increase the parallelism of Spark in processing the data. Refer to the main 
document for more information on that.
     
        - Multiple Kafka input DStreams can be created with different groups 
and topics for parallel receiving of data using multiple receivers.
     
    -3. **Deploying:** Package 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
(except `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by 
`spark-submit`) into the application JAR. Then use `spark-submit` to launch 
your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
    -
    -Note that the Kafka receiver used by default is an
    -[*unreliable* 
receiver](streaming-programming-guide.html#receiver-reliability) section in the
    -programming guide). In Spark 1.2, we have added an experimental *reliable* 
Kafka receiver that
    -provides stronger
    -[fault-tolerance 
guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
    -data loss on failures. This receiver is automatically used when the write 
ahead log
    -(also introduced in Spark 1.2) is enabled
    -(see [Deployment](#deploying-applications.html) section in the programming 
guide). This
    -may reduce the receiving throughput of individual Kafka receivers compared 
to the unreliable
    -receivers, but this can be corrected by running
    -[more receivers in 
parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
    -to increase aggregate throughput. Additionally, it is recommended that the 
replication of the
    -received data within Spark be disabled when the write ahead log is enabled 
as the log is already stored
    -in a replicated storage system. This can be done by setting the storage 
level for the input
    -stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
    +   - If you have enabled Write Ahead Logs with a replicated file system 
like HDFS, the received data is already being replicated in the log. Hence, the 
storage level in storage level for the input stream to 
`StorageLevel.MEMORY_AND_DISK_SER` (that is, use
     `KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
    +asdasd
    +
    +3. **Deploying:** As with any Spark applications, `spark-submit` is used 
to launch your application. However, the details are slightly different for 
Scala/Java applications and Python applications.
    +
    +   For Scala and Java applications, if you are SBT or Maven for project 
management, then package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` 
and its dependencies into the application JAR. Make sure 
`spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide). 
    +
    +   For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies can 
be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is, 
    +
    +       ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
    +
    +   Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-assembly` from the 
    +   [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
    +
    +## Approach 2: Direct Approach (No Receivers)
    +This is a new receiver-less "direct" approach has been introduced in Spark 
1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to 
receive data, this approach periodically queries Kafka for the latest offsets 
in each topic+partition, and accordingly defines the offset ranges to process 
in each batch. When the jobs to process the data is launched, Kafka's simple 
consumer API is used to read the defined ranges of offsets from Kafka (similar 
to read files from a file system). Note that this is an experimental feature in 
Spark 1.3 and is only available in the 
    +
    +This approach has the following advantages over the first approach.
    +
    +- *Simplified Parallelism:* No need to create multiple input Kafka streams 
and union-ing them. With `directStream`, Spark Streaming will create as many 
RDD partitions as there is Kafka partitions to consume, which will all read 
data from Kafka in parallel. So there is one-to-one mapping between Kafka and 
RDD partitions, which is easier to understand and tune.
    +
    +- *Efficiency:* Achieving zero-data loss in the first approach required 
the data to be stored in a Write Ahead Log, which further replicated the data. 
This is actually inefficient as the data effectively gets replicated twice - 
once by Kafka, and a second time by the Write Ahead Log. This second approach 
eliminate the problem as there is no receiver, and hence no need for Write 
Ahead Logs.
    +
    +- *Exactly-once semantics:* The first approach uses Kafka's high level API 
to store consumed offsets in Zookeeper. This is traditionally the way to 
consume data from Kafka. While this approach (in combination with write ahead 
logs) can ensure zero data loss (i.e. at-least once semantics), there is a 
small chance some records may get consumed twice under some failures. This 
occurs because of inconsistencies between data reliably received by Spark 
Streaming and offsets tracked by Zookeeper . Hence, in this second approach, we 
use simple Kafka API that does not use Zookeeper and offsets tracked only by 
Spark Streaming within its checkpoints. This eliminates inconsistencies between 
Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark 
Streaming effectively exactly once despite failures.
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to