GitHub user markgrover opened a pull request:
https://github.com/apache/spark/pull/10953
[SPARK-12177] [STREAMING] Update KafkaDStreams to new Kafka 0.9 Consumer API
First off, many thanks and credit to @nikit-os for starting this work. I am
building up on his good work done at #10681.
Here is some context: Kafka has a new consumer API starting Apache Kafka
0.9 (Kafka 0.9.0.0, to be precise). This API supports security features like
authentication (via Kerberos) and encryption (via SSL). This new API is in beta
but is the only way we can integrate Spark and Kafka in a secure environment.
Kafka 0.9 ships with both the new and old consumer API. So, we can up our
dependency of Kafka from 0.8* to 0.9. However, the old consumer API in Kafka
0.9 is binary incompatible with the old consumer API in Kafka 0.8*. There are
also some minor API compatibility issues between the old API of Kafka 0.8.* and
Kafka 0.9, mostly around ZkClient and ZkUtils API. In almost all cases, when
users are using the old API whether they are using Kafka 0.8 or Kafka 0.9 with
Spark, they won't have to modify any code on their side. It's only when they
use the *new* consumer API from Kafka 0.9, they will have to modify their code.
In addition to supporting to the new API from Kafka 0.9, we can support the
old API from Kafka 0.8 only (which works with Kafka 0.8 and Kafka 0.9 brokers),
or old API from Kafka 0.9 (which only works with Kafka 0.9 brokers). If we
support the old API from 0.8.0, we complicate our integration. We will have to
build 2 artifacts (kafka-assembly jars, that is), one for Kafka 0.8 and Kafka
0.9. We'll have to do maven profiles, one for Kafka 0.8 and one for 0.9.
Alternatively, if we only support old API from 0.9, users will have upgrade
Kafka brokers to 0.9, when using Spark 2.0.
In fact, users should ideally upgrade Kafka brokers before they upgrade
Spark since old 0.8.0 clients can work with the new 0.9.0 brokers). I am
personally ok with requiring users to upgrade Kafka brokers to 0.9.0 since 1)
this is going in Spark 2.0, 2) that's what the Kafka community is pushing for,
3) I think the baggage of supporting Kafka 0.8's old consumer API might be too
much to carry until another major release of Spark. Again, with Kafka 0.9's old
consumer API, which we support, no user code needs to be changed, they do need
to upgrade their brokers though.
As far as the code in this PR goes, we decided to put code for the new API
in a new package (called org.apache.spark.streaming.kafka.newapi*), creating a
separate maven artifact spark-streaming-kafka-newapi_2.10.
In essence, this means that there are two KafkaCluster classes and two
KafkaUtils classes, etc. (similar to how we have NewHadoopRDD) because there
wasn't enough duplication between the two implementations. Where there was
enough duplication, example KafkaTestUtils, there is only one implementation.
I'd love to get this reviewed. Thanks in advance!
The examples of the new API have some extra required parameters, I will
change it shortly for them to have reasonable defaults and be optional.
Follow on work:
* This is only scala/java implementation. I am working on and currently
testing a python implementation in a separate branch at
https://github.com/markgrover/spark/tree/kafka09-integration-python I want to
get this PR out and committed first. It'll also help me gather feedback sooner
than later.
* Add and test authentication with kerberos. For that, we need delegation
tokens in Kafka (which is currently unresolved:
https://issues.apache.org/jira/browse/KAFKA-1696). This is because, especially
in the case of DirectStream, all executors can consume from Kafka brokers and
initializing credentials at each executor can been seen as a DoS attack by the
Ticket Granting server.
* Encryption exists in this patch but I haven't spent much time integration
testing it. If folks prefer, taking that out until I have gotten a chance to
integration test it on my own cluster, I am open to that. It's only a few lines
and a test that needs to be taken out.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/markgrover/spark kafka09-integration
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/10953.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #10953
----
commit cff3a44ca6eae90038dd9e31bf5349ddd6301fd4
Author: nikit-os <[email protected]>
Date: 2016-01-10T13:23:27Z
[SPARK-12177] [STREAMING] Update KafkaDStreams to new Kafka 0.9 Consumer API
commit af0c7b918a2fc3e7f7728d9f39f84a9ceb8a5b65
Author: Mark Grover <[email protected]>
Date: 2016-01-07T23:54:47Z
Getting rid of an extra dependency
commit ff0a42706ef6428fc15b2748ae248b8339669375
Author: Nikita <[email protected]>
Date: 2016-01-11T18:56:55Z
Merge pull request #1 from markgrover/kafka09-integration
Getting rid of an extra dependency
commit e5f638fb98e8b9990e32148ab2757dccf18686e7
Author: nikit-os <[email protected]>
Date: 2016-01-11T19:12:23Z
Fix Kafka params in Java example
commit e0f127596e51fb611dde7c52079d17722dcca4a9
Author: Mark Grover <[email protected]>
Date: 2016-01-19T19:46:28Z
Removing Broker.scala since it's not needed
commit 71ea192494135a1c608e73960e86df90fb4e303f
Author: Mark Grover <[email protected]>
Date: 2016-01-19T20:00:29Z
Scala style related changes to not exceed the line length
commit 6d27bbd67c26251584edd8f3390dcd8170f56df7
Author: Mark Grover <[email protected]>
Date: 2016-01-19T21:21:23Z
Formatting changes for styling, also updating scaladoc for functions.
commit 69607361c605c4ac9a2ff566e5567637f20f26d0
Author: Mark Grover <[email protected]>
Date: 2016-01-19T21:21:49Z
Adding missing leaderhost parameter to various methods in OffsetRange.scala
commit bb8cef840f37149f9d3bb7134f4afb5f2f0a4c02
Author: Mark Grover <[email protected]>
Date: 2016-01-20T00:13:09Z
Fixing a compilation failure related to previous commit
commit 90b80da66a63bd7047bf46d2441e403da2051ee7
Author: Nikita <[email protected]>
Date: 2016-01-20T09:39:19Z
Merge pull request #2 from markgrover/kafka09-integration
More changes
commit 4f56ffec0f7b49c16ab2e1d6053d924995ddc032
Author: Mark Grover <[email protected]>
Date: 2016-01-26T22:57:44Z
Refactoring Java/Scala code to have use the package name newapi instead of
v09 to avoid confusion. Python work is being done in a separate branch
github.com/markgrover/spark/tree/kafka09-integration-python
commit d5b7b56a5ed930546c04225572070364b2a95cab
Author: Mark Grover <[email protected]>
Date: 2016-01-26T23:24:27Z
Fixing a scalastyle error
commit e2402f895fe31d385aad69c37de0dc01e5afd37e
Author: Mark Grover <[email protected]>
Date: 2016-01-26T23:32:26Z
Merge branch 'master' into kafka09-integration
commit eedbddcbbfa684dd045f625c0aee4d1b84128523
Author: Mark Grover <[email protected]>
Date: 2016-01-26T23:54:44Z
Fixing scalastyle errors related to import ordering
commit adfd3356c124386d36e01d943018bfde119c6c8e
Author: Mark Grover <[email protected]>
Date: 2016-01-27T01:35:18Z
Fixing a compilation error
commit a0f052acef92491af213662155b520103e1c7040
Author: Mark Grover <[email protected]>
Date: 2016-01-27T01:39:27Z
More import ordering fixes
commit d2bc2a95152b070d0f40c5060e8728a398e4d870
Author: Mark Grover <[email protected]>
Date: 2016-01-27T01:58:12Z
Function declaration formatting in KafkaUtils and other minor formatting
changes
commit c530fa63d7919a484d4cfb47120c1da828e2fc83
Author: Mark Grover <[email protected]>
Date: 2016-01-27T04:18:31Z
Removing support for Kafka 0.8 completely. You can use the new api or the
old api using 0.9.0.0 only.
commit 0763979cf64fcf97df50ec4fdb3d79e3380784df
Author: Mark Grover <[email protected]>
Date: 2016-01-27T04:23:35Z
Making the new api KafkaTestUtils to not use zkClient explicitly
commit b35ea6500ca6ea9024632476e266bef2828fb23d
Author: Mark Grover <[email protected]>
Date: 2016-01-27T04:39:45Z
Removing the separate copy of KafkaTestUtils
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]