[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-29 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/23148
  
Cool, opened with what I have so far, will keep an eye out for others for a 
while.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23182: Config change followup to [SPARK-26177] Automated...

2018-11-29 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/23182

Config change followup to [SPARK-26177] Automated formatting for Scala code

Let's keep this open for a while to see if other configuration tweaks are 
suggested

## What changes were proposed in this pull request?

Formatting configuration changes following up 
https://github.com/apache/spark/pull/23148

## How was this patch tested?

./dev/scalafmt

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 scalafmt-config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23182


commit 07ca58ff2e7b0df19d4d755cba0152e323dc0d99
Author: cody koeninger 
Date:   2018-11-29T18:03:16Z

allow closing parens on same line




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-29 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/23148
  
Just pushed a tweak to allow closing parens on same line.  New pr for that, 
or do we want to keep identifying other tweaks first?

I think the args on their own line is triggered once the line is longer 
than maxColumns.  There's an option for bin-packing argument lists, but seems 
like it's only for literal arguments and a few other cases, see 
https://scalameta.org/scalafmt/docs/configuration.html#binpackliteralargumentlists
  and 
https://github.com/scalameta/scalafmt/blob/08c6798a40188ce69b3287e3026becfbd540a847/scalafmt-core/shared/src/main/scala/org/scalafmt/config/BinPack.scala

I'm not aware of an option to only format the areas that the diff is in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-27 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/23148
  
I tested that example, it will reformat to

```
checkScan(
  query,
  
"struct,address:string,pets:int,"
 +
"friends:array>," +

"relatives:map>>"
)
```

but after that, it will not complain about the long unbreakable string 
starting with "struct".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-27 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23148#discussion_r236858392
  
--- Diff: dev/.scalafmt.conf ---
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+align = none
+align.openParenDefnSite = false
+align.openParenCallSite = false
+align.tokens = []
+docstrings = JavaDoc
+maxColumn = 98
--- End diff --

For now, this won't complain about any code, it just gives a way for 
contributors to format it.

If we did turn on test as part of the build process, it would complain 
about any code that was different after formatting.  That could include code in 
that 99-101 column range, depending on whether it hit that corner case.  E.g.

```
// this is 101 columns and will fail scalastyle, but will pass with 
scalafmt set to 99
if 
(caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}"))
 {
```

My thinking is that being slightly more strict than existing scalastyle is 
better than having automated formatting that won't pass other existing 
automated checks.  Trying to find and upstream fixes to off-by-one errors in 
scalastyle is another option, but that will take a while.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-27 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23148#discussion_r236824453
  
--- Diff: dev/.scalafmt.conf ---
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+align = none
+align.openParenDefnSite = false
+align.openParenCallSite = false
+align.tokens = []
+docstrings = JavaDoc
+maxColumn = 98
--- End diff --

I originally had this set to 100, and noticed a corner case where an ending 
space and curly brace were formatted on columns 100 and 101 respectively, 
making scalastyle complain.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-26 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/23148
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23148#discussion_r236423672
  
--- Diff: .scalafmt.conf ---
@@ -0,0 +1,24 @@
+#
--- End diff --

Sure, moved


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23148#discussion_r236423438
  
--- Diff: pom.xml ---
@@ -156,6 +156,10 @@
 3.2.2
 2.12.7
 2.12
+1.5.1
--- End diff --

I moved the scalafmt version inline.  The others I think need to stay as 
parameters so the shell wrapper can override them, to distinguish between the 
use cases of build pipeline vs contributor fixing code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code

2018-11-26 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/23148

[SPARK-26177] Automated formatting for Scala code

## What changes were proposed in this pull request?

Add a maven plugin and wrapper script at ./dev/scalafmt to use scalafmt to 
format files that differ from git master.

Intention is for contributors to be able to use this to automate fixing 
code style, not to include it in build pipeline yet.

If this PR is accepted, I'd make a different PR to update the code style 
section of https://spark.apache.org/contributing.html to mention the script

## How was this patch tested?

Manually tested by modifying a few files and running the script, then 
checking that scalastyle still passed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 scalafmt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23148.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23148


commit 5df854a340f5e9e2fe7ba60194b4891235420369
Author: cody koeninger 
Date:   2018-11-21T20:32:57Z

WIP on scalafmt config

commit 57684272b60ff1213c957541f1db3f9d7aba1543
Author: cody koeninger 
Date:   2018-11-21T21:22:15Z

mvn plugin, e.g. ./build/mvn mvn-scalafmt_2.12:format

commit bd7baeca577bf9b519fe028d1e831fb7193e7af9
Author: cody koeninger 
Date:   2018-11-22T17:07:06Z

shell wrapper for mvn scalafmt plugin




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23103: [SPARK-26121] [Structured Streaming] Allow users to defi...

2018-11-26 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/23103
  
merging to master, thanks @zouzias 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...

2018-11-23 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22824
  
@tdas or @jose-torres any opinion on whether it's worth refactoring these 
checks as suggested by @arunmahadevan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23103: [SPARK-26121] [Structured Streaming] Allow users to defi...

2018-11-22 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/23103
  
@zouzias can you add the new option to 
docs/structured-streaming-kafka-integration.md as part of this PR?  
Instructions for building docs are in docs/README.md , ping me if you need a 
hand.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235480695
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
--- End diff --

first arg should be on its own newline as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235479872
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
+ metadataPath: String): String = {
+val groupIdPrefix = parameters
+  .getOrElse("group.id.prefix", "spark-kafka-source")
--- End diff --

kafka.* is reserved for the existing kafka project's client configs, see 
e.g. line 86.  I'd just go with groupIdPrefix


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235462394
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
+ metadataPath: String): String = {
+val groupIdPrefix = parameters
+  .getOrElse("group.id.prefix", "spark-kafka-source")
--- End diff --

It seems like convention has been to mostly use camelcase for streaming 
options that aren't from the existing kafka.blah.whatever configuration 
namespace... e.g. subscribePattern, startingOffsets, maxOffsetsPerTrigger


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235461374
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
--- End diff --

Sorry there isn't an automatic formatter for this... but use 4 space 
indentation for multi-line argument lists

https://github.com/databricks/scala-style-guide#spacing-and-indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-11-07 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21038
  
@SehanRathnayake Kafka is designed for at most one consumer per partition 
per consumer group at any given point in time, 
https://kafka.apache.org/documentation/#design_consumerposition  
Spark already manages creating a consumer per partition for the consumer 
group associated with a stream.
If you have a valid use case for running multiple Spark applications with 
the same consumer group, please explain it in a jira, not discussion of a pull 
request that has already been merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...

2018-10-26 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22824
  
@arunmahadevan I think this is clear, even if it is redundant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...

2018-10-26 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22824
  
jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22703: [SPARK-25705][BUILD][STREAMING] Remove Kafka 0.8 integra...

2018-10-12 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22703
  
I guess the only argument to the contrary would be if some of the known
issues end up being better solved with minor API changes, leaving it marked
as experimental would technically be better notice.

I personally think it's clearer to remove the experimental.

On Fri, Oct 12, 2018, 6:18 PM Sean Owen  wrote:

> *@srowen* commented on this pull request.
> --
>
> In docs/streaming-kafka-0-10-integration.md
> <https://github.com/apache/spark/pull/22703#discussion_r224936431>:
>
> > @@ -3,7 +3,11 @@ layout: global
>  title: Spark Streaming + Kafka Integration Guide (Kafka broker version 
0.10.0 or higher)
>  ---
>
> -The Spark Streaming integration for Kafka 0.10 is similar in design to 
the 0.8 [Direct Stream 
approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers).
  It provides simple parallelism,  1:1 correspondence between Kafka partitions 
and Spark partitions, and access to offsets and metadata. However, because the 
newer integration uses the [new Kafka consumer 
API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API, there are notable differences in usage. This version of the 
integration is marked as experimental, so the API is potentially subject to 
change.
> +The Spark Streaming integration for Kafka 0.10 provides simple 
parallelism, 1:1 correspondence between Kafka
> +partitions and Spark partitions, and access to offsets and metadata. 
However, because the newer integration uses
> +the [new Kafka consumer 
API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API,
> +there are notable differences in usage. This version of the integration 
is marked as experimental, so the API is
>
> Yeah, good general point. Is the kafka 0.10 integration at all
> experimental anymore? Is anything that survives from 2.x to 3.x? I'd say
> "no" in almost all cases. What are your personal views on that?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22703#discussion_r224936431>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB1mUBOw72gARWj6GcclgXDimi6KIks5ukSNggaJpZM4XYdgE>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22703: [SPARK-25705][BUILD][STREAMING] Remove Kafka 0.8 ...

2018-10-12 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/22703#discussion_r224899199
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -3,7 +3,11 @@ layout: global
 title: Spark Streaming + Kafka Integration Guide (Kafka broker version 
0.10.0 or higher)
 ---
 
-The Spark Streaming integration for Kafka 0.10 is similar in design to the 
0.8 [Direct Stream 
approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers).
  It provides simple parallelism,  1:1 correspondence between Kafka partitions 
and Spark partitions, and access to offsets and metadata. However, because the 
newer integration uses the [new Kafka consumer 
API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API, there are notable differences in usage. This version of the 
integration is marked as experimental, so the API is potentially subject to 
change.
+The Spark Streaming integration for Kafka 0.10 provides simple 
parallelism, 1:1 correspondence between Kafka 
+partitions and Spark partitions, and access to offsets and metadata. 
However, because the newer integration uses 
+the [new Kafka consumer 
API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API, 
+there are notable differences in usage. This version of the integration is 
marked as experimental, so the API is 
--- End diff --

Do we want to leave the new integration marked as experimental if it is now 
the only available one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

2018-08-30 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/3
  
Thanks, merging to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

2018-08-29 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/3
  
I think since the default behavior is still 1, it's probably ok to let
someone do what they want here

On Wed, Aug 29, 2018 at 3:51 PM, rezasafi  wrote:

> *@rezasafi* commented on this pull request.
> --
>
> In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
> DirectKafkaInputDStream.scala
> <https://github.com/apache/spark/pull/3#discussion_r213829668>:
>
> > @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
>  if (effectiveRateLimitPerPartition.values.sum > 0) {
>val secsPerBatch = 
context.graph.batchDuration.milliseconds.toDouble / 1000
>Some(effectiveRateLimitPerPartition.map {
> -case (tp, limit) => tp -> Math.max((secsPerBatch * 
limit).toLong, 1L)
> +case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
> +  Math.max(ppc.minRatePerPartition(tp), 1L))
>
> I just didn't want to break the reasoning behind SPARK-18371 to have at
> least 1 always. I didn't have any other reason for this. I can change it 
to
> give the user the freedom.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/3#discussion_r213829668>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB2QkKLkQOCQoRTA0hArIsJIhY99Oks5uVv7LgaJpZM4WLq9l>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

2018-08-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3#discussion_r213825892
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
 if (effectiveRateLimitPerPartition.values.sum > 0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
   Some(effectiveRateLimitPerPartition.map {
-case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 
1L)
+case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
+  Math.max(ppc.minRatePerPartition(tp), 1L))
--- End diff --

Is the second Math.max actually necessary?
The default implementation of minRatePerPartition will be 1 anyway.
If someone makes a custom implementation that e.g. returns zero, should 
they get what they asked for?.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

2018-08-24 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3#discussion_r212691622
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -141,10 +143,9 @@ private[spark] class DirectKafkaInputDStream[K, V](
   tp -> Math.max(offset - currentOffsets(tp), 0)
 }
 val totalLag = lagPerPartition.values.sum
-
 lagPerPartition.map { case (tp, lag) =>
   val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
-  val backpressureRate = lag / totalLag.toDouble * rate
+  var backpressureRate = lag / totalLag.toDouble * rate
--- End diff --

Why was this changed to a var?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

2018-08-24 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3#discussion_r212691455
  
--- Diff: docs/configuration.md ---
@@ -1925,6 +1925,14 @@ showDF(properties, numRows = 200, truncate = FALSE)
 first batch when the backpressure mechanism is enabled.
   
 
+
+  
spark.streaming.backpressure.fixedMinMessagePerPartition
--- End diff --

This only applies to Kafka.  Why not namespace it under 
spark.streaming.kafka?

What does the word "fixed" add to the explanation?

There's already a maxRatePerPartition, why not minRatePerPartition?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...

2018-08-24 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/3
  
Jenkins, ok to test



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-08-20 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22138
  
Seeking means the pre-fetched data is wasted, so it's not a light
operation.  It shouldn't be unavoidable, e.g. if consumers were cached
keyed by topicpartition, groupid, next offset to be processed.  One concern
there would be how to make sure you don't have lots of idle consumers.

The question of how serious an issue is could be solved by measurement, but
I don't have production structured streaming jobs, much less ones that
exhibit the kind of behavior tdas was talking about in the original ticket.

On Mon, Aug 20, 2018 at 7:36 PM, Jungtaek Lim 
wrote:

> @koeninger <https://github.com/koeninger>
>
> I'm not sure but are you saying that an executor cares about multiple
> queries (multiple jobs) concurrently? I honestly didn't notice it. If that
> is going to be problem, we should add something (could we get query id at
> that time?) in cache key to differentiate consumers. If we want to avoid
> extra seeking due to different offsets, consumers should not be reused
> among with multiple queries, and that's just a matter of cache key.
>
> If you are thinking about co-use of consumers among multiple queries
> because of reusing connection to Kafka, I think extra seeking is
> unavoidable (I guess fetched data should be much more critical issue 
unless
> we never reuse after returning to pool). If seeking is light operation, we
> may even go with only reusing connection (not position we already sought):
> always resetting position (and data maybe?) when borrowing from pool or
> returning consumer to pool.
>
> Btw, the rationalization of this patch is not solving the issue you're
> referring. This patch is also based on #20767
> <https://github.com/apache/spark/pull/20767> but dealing with another
> improvements pointed out in comments: adopt pool library to not reinvent
> the wheel, and also enabling metrics regarding the pool.
>
> I'm not sure the issue you're referring is a serious one (show-stopper):
> if the issue is a kind of serious, someone should handle the issue once we
> are aware of the issue at March, or at least relevant JIRA issue should be
> filed with detailed explanation before. I'd like to ask you in favor of
> handling (or filing) the issue since you may know the issue best.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22138#issuecomment-414498067>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB7qFjFrj9dWWkIcUcKcAKbEicuOwks5uS0gDgaJpZM4WCUJs>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-08-20 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22138
  
see e.g. https://github.com/apache/spark/pull/20767 for background

Even if this patch doesn't change behavior, if it doesn't really solve the
problem, it may make it harder to solve it correctly.

On Mon, Aug 20, 2018 at 10:32 AM, Jungtaek Lim 
wrote:

> If my understanding is right, looks like current approach has same
> limitation. I guess you're busy, but could you refer some issue number or
> point out some code lines which was based on the reason if you remember
> any? It should help to determine whether this patch breaks more spots or
> not.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22138#issuecomment-414336510>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB52zi78tff9E-kvPzeB7k0ZnOyFBks5uSsh0gaJpZM4WCUJs>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-08-20 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22138
  
I thought the whole reason the caching was changed from the initial naive
approach to the current approach in master was that people were running
jobs that were scheduling multiple consumers for the same topicpartition
and group.



On Sun, Aug 19, 2018 at 7:51 PM, Jungtaek Lim 
wrote:

> @koeninger <https://github.com/koeninger>
> I'm not sure I got your point correctly. This patch is based on some
> assumptions, so please correct me if I'm missing here. Assumptions follow:
>
>1.
>
>There's actually no multiple consumers for a given key working at the
>same time. The cache key contains topic partition as well as group id. 
Even
>the query tries to do self-join so reading same topic in two different
>sources, I think group id should be different.
>2.
>
>In normal case the offset will be continuous, and that's why cache
>should help. In retrying case this patch invalidates cache as same as
>current behavior, so it should start from scratch.
>
> (Btw, I'm curious what's more expensive between leveraging pooled object
> but resetting kafka consumer vs invalidating pooled objects and start
> from scratch. Latter feels more safer but if we just need extra seek
> instead of reconnecting to kafka, resetting could be improved and former
> will be cheaper. I feel it is out of scope of my PR though.)
>
> This patch keeps most of current behaviors, except two spots I guess. I
> already commented a spot why I change the behavior, and I'll comment
> another spot for the same.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22138#issuecomment-414164788>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB8x3Khz4bWIxphLJHWFvcc8H4ERyks5uSfnvgaJpZM4WCUJs>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

2018-08-19 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22143
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-08-19 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/22138
  
If you have multiple consumers for a given key, and those consumers are at 
different offsets, isn't it likely that the client code will not get the right 
consumer, leading to extra seeking?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-06 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21917
  
Recursively creating a Kafka RDD during creation of a Kafka RDD would need
a base case, but yeah, some way to have appropriate preferred locations.

On Mon, Aug 6, 2018 at 2:58 AM, Quentin Ambard 
wrote:

> *@QuentinAmbard* commented on this pull request.
> --
>
> In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
> DirectKafkaInputDStream.scala
> <https://github.com/apache/spark/pull/21917#discussion_r207802444>:
>
> > -  val fo = currentOffsets(tp)
> -  OffsetRange(tp.topic, tp.partition, fo, uo)
> +  /**
> +   * Return the offset range. For non consecutive offset the last offset 
must have record.
> +   * If offsets have missing data (transaction marker or abort), 
increases the
> +   * range until we get the requested number of record or no more 
records.
> +   * Because we have to iterate over all the records in this case,
> +   * we also return the total number of records.
> +   * @param offsets the target range we would like if offset were 
continue
> +   * @return (totalNumberOfRecords, updated offset)
> +   */
> +  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
> +if (nonConsecutive) {
> +  val localRw = rewinder()
> +  val localOffsets = currentOffsets
> +  
context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => {
>
> Are you suggesting I should create a new kafkaRDD instead, and consume
> from this RDD to get the last offset range?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21917#discussion_r207802444>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB_EelzeJDa36_SAKaH8trQC5bTnGks5uN_cugaJpZM4VmlWm>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-06 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21917
  
Example report of skipped offsets in a non-compacted non-transactional
situation


http://mail-archives.apache.org/mod_mbox/kafka-users/201801.mbox/%3ccakwx9vxc1cdosqwwwjk3qmyy3svvtmh+rjdrjyvsbejsds8...@mail.gmail.com%3EFo

I asked on the kafka list about ways to tell if an offset is a
transactional marker.  I also asked about endOffset alternatives, although
I think that doesn't totally solve the problem (for instance, in cases
where the batch size has been rate limited)

On Mon, Aug 6, 2018 at 2:57 AM, Quentin Ambard 
wrote:

> By failed, you mean returned an empty collection after timing out, even
> though records should be available? You don't. You also don't know that it
> isn't just lost because kafka skipped a message. AFAIK from the 
information
> you have from a kafka consumer, once you start allowing gaps in offsets,
> you don't know.
>
> Ok that's interesting, my understanding was that if you successfully poll
> and get results you are 100% sure that you don't lose anything. Do you 
have
> more details on that? Why would kafka skip a record while consuming?
>
> Have you tested comparing the results of consumer.endOffsets for consumers
> with different isolation levels?
>
> endOffsets returns the last offset (same as seekToEnd). But you're right
> that the easiest solution for us would be to have something like
> seekToLastRecord method instead. Maybe something we could also ask ?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21917#issuecomment-410620996>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB2FVhHp_76l0WnRg_2WPgzSx1LlSks5uN_bxgaJpZM4VmlWm>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721681
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, 
ConsumerRecords}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.Logging
+
+class OffsetWithRecordScannerSuite
+  extends SparkFunSuite
+with Logging {
+
+  class OffsetWithRecordScannerMock[K, V](records: 
List[Option[ConsumerRecord[K, V]]])
+extends OffsetWithRecordScanner[K, V](
+  Map[String, Object]("isolation.level" -> "read_committed").asJava, 
1, 1, 0.75F, true) {
+var i = -1
+override protected def getNext(c: KafkaDataConsumer[K, V]): 
Option[ConsumerRecord[K, V]] = {
+  i = i + 1
+  records(i)
+}
+
+  }
+
+  val emptyConsumerRecords = new ConsumerRecords[String, 
String](ju.Collections.emptyMap())
+  val tp = new TopicPartition("topic", 0)
+
+  test("Rewinder construction should fail if isolation level isn set to 
read_committed") {
+intercept[IllegalStateException] {
+  new OffsetWithRecordScanner[String, String](
+Map[String, Object]("isolation.level" -> 
"read_uncommitted").asJava, 1, 1, 0.75F, true)
+}
+  }
+
+  test("Rewinder construction shouldn't fail if isolation level isn't 
set") {
+  assert(new OffsetWithRecordScanner[String, String](
+Map[String, Object]().asJava, 1, 1, 0.75F, true) != null)
+  }
+
+  test("Rewinder construction should fail if isolation level isn't set to 
committed") {
+intercept[IllegalStateException] {
+  new OffsetWithRecordScanner[String, String](
+Map[String, Object]("isolation.level" -> 
"read_uncommitted").asJava, 1, 1, 0.75F, true)
+}
+  }
+
+  test("Rewind should return the proper count.") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), Some(1), Some(2), Some(3)))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2)
+assert(offset === 2)
+assert(size === 2)
+  }
+
+  test("Rewind should return the proper count with gap") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), Some(1), Some(3), Some(4), Some(5)))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3)
+assert(offset === 4)
+assert(size === 3)
+  }
+
+  test("Rewind should return the proper count for the end of the 
iterator") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), Some(1), Some(2), None))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3)
+assert(offset === 3)
+assert(size === 3)
+  }
+
+  test("Rewind should return the proper count missing data") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), None))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2)
+assert(offset === 1)
+assert(size === 1)
+  }
+
+  test("Rewind should return the proper count without data") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(None))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2)
+assert(offset === 0)
+assert(size === 0)
+ 

[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721657
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
 ---
@@ -90,21 +90,23 @@ final class OffsetRange private(
 val topic: String,
 val partition: Int,
 val fromOffset: Long,
-val untilOffset: Long) extends Serializable {
+val untilOffset: Long,
+val recordNumber: Long) extends Serializable {
--- End diff --

Does mima actually complain about binary compatibility if you just make 
recordNumber count?  It's just an accessor either way...

If so, and you have to do this, I'd name this recordCount consistently 
throughout.  Number could refer to a lot of things that aren't counts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721492
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -191,6 +211,11 @@ private[kafka010] class InternalKafkaConsumer[K, V](
 buffer.previous()
   }
 
+  def seekAndPoll(offset: Long, timeout: Long): ConsumerRecords[K, V] = {
--- End diff --

Is this used anywhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721482
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -183,6 +187,22 @@ private[kafka010] class InternalKafkaConsumer[K, V](
 record
   }
 
+  /**
+   * Similar to compactedStart but will return None if poll doesn't
--- End diff --

Did you mean compactedNext?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721435
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
 }.getOrElse(offsets)
   }
 
-  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
-val untilOffsets = clamp(latestOffsets())
-val offsetRanges = untilOffsets.map { case (tp, uo) =>
-  val fo = currentOffsets(tp)
-  OffsetRange(tp.topic, tp.partition, fo, uo)
+  /**
+   * Return the offset range. For non consecutive offset the last offset 
must have record.
+   * If offsets have missing data (transaction marker or abort), increases 
the
+   * range until we get the requested number of record or no more records.
+   * Because we have to iterate over all the records in this case,
+   * we also return the total number of records.
+   * @param offsets the target range we would like if offset were continue
+   * @return (totalNumberOfRecords, updated offset)
+   */
+  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
+if (nonConsecutive) {
+  val localRw = rewinder()
+  val localOffsets = currentOffsets
+  context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
--- End diff --

Because this isn't a kafka rdd, it isn't going to take advantage of 
preferred locations, which means it's going to create cached consumers on 
different executors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-04 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21917
  
> How do you know that offset 4 isn't just lost because poll failed?

By failed, you mean returned an empty collection after timing out, even 
though records should be available?  You don't.  You also don't know that it 
isn't just lost because kafka skipped a message.  AFAIK from the information 
you have from a kafka consumer, once you start allowing gaps in offsets, you 
don't know.

I understand your point, but even under your proposal you have no guarantee 
that the poll won't work in your first pass during RDD construction, and then 
fail on the executor during computation, right?

> The issue with your proposal is that SeekToEnd gives you the last offset 
which might not be the last record.

Have you tested comparing the results of consumer.endOffsets for consumers 
with different isolation levels?

Your proposal might end up being the best approach anyway, just because of 
the unfortunate effect of StreamInputInfo and count, but I want to make sure we 
think this through.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-04 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21917
  
If the last offset in the range as calculated by the driver is 5, and on 
the executor all you can poll up to after a repeated attempt is 3, and the user 
already told you to allowNonConsecutiveOffsets... then you're done, no error.

Why does it matter if you do this logic when you're reading all the 
messages in advance and counting, or when you're actually computing? 

To put it another way, this PR is a lot of code change and refactoring, why 
not just change the logic of e.g. how CompactedKafkaRDDIterator interacts with 
compactedNext?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-04 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21917
  
Still playing devil's advocate here, I don't think stopping at 3 in your 
example actually tells you anything about the cause of the gaps in the sequence 
at 4.  I'm not sure you can know that the gap is because of a transaction 
marker, without a modified kafka consumer library.

If the actual problem is that when allowNonConsecutiveOffsets is set we 
need to allow gaps even at the end of an offset range... why not just fix that 
directly?

Master is updated to kafka 2.0 at this point, so we should be able to write 
a test for your original jira example of a partition consisting of 1 message 
followed by 1 transaction commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...

2018-08-03 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21955
  
I don't see an obvious issue.  Looks like zookeeper.connection.timeout.ms 
isn't being set, so it's defaulting to 6 seconds... could try tweaking it 
upwards.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207664852
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
   server = new KafkaServer(brokerConf)
   server.startup()
-  brokerPort = server.boundPort()
+  brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
--- End diff --

Isn't the test hanging on the line right before that change though?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21983: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

2018-08-03 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21983
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207437645
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
 }.getOrElse(offsets)
   }
 
-  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
-val untilOffsets = clamp(latestOffsets())
-val offsetRanges = untilOffsets.map { case (tp, uo) =>
-  val fo = currentOffsets(tp)
-  OffsetRange(tp.topic, tp.partition, fo, uo)
+  /**
+   * Return the offset range. For non consecutive offset the last offset 
must have record.
+   * If offsets have missing data (transaction marker or abort), increases 
the
+   * range until we get the requested number of record or no more records.
+   * Because we have to iterate over all the records in this case,
+   * we also return the total number of records.
+   * @param offsets the target range we would like if offset were continue
+   * @return (totalNumberOfRecords, updated offset)
+   */
+  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
+if (nonConsecutive) {
+  val localRw = rewinder()
+  val localOffsets = currentOffsets
+  context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
+tpos.map { case (tp, o) =>
+  val offsetAndCount = 
localRw.getLastOffsetAndCount(localOffsets(tp), tp, o)
+  (tp, offsetAndCount)
+}
+  }).collect()
--- End diff --

What exactly is the benefit gained by doing a duplicate read of all the 
messages?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...

2018-08-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21955
  
jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207422889
  
--- Diff: external/kafka-0-10/pom.xml ---
@@ -28,7 +28,7 @@
   spark-streaming-kafka-0-10_2.11
   
 streaming-kafka-0-10
-0.10.0.1
+2.0.0
   
   jar
   Spark Integration for Kafka 0.10
--- End diff --

Probably worth updating the name to indicate it's for brokers version 0.10 +


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-07-30 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21917
  
jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...

2018-07-13 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21690
  
LGTM, merging to master.  Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...

2018-07-12 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21690
  
What results are you seeing?

On Thu, Jul 12, 2018, 6:53 AM Yuanbo Liu  wrote:

> @koeninger <https://github.com/koeninger> Sorry to interrupt, could you
> take a look at my patch?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21690#issuecomment-404501493>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGABxNyVlFhIo7z2D7jXtfeBfsVRsMmks5uF0bXgaJpZM4U-p4M>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...

2018-07-06 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21690
  
@yuanboliu What I'm suggesting is more like this:


https://github.com/apache/spark/compare/master...koeninger:SPARK-24713?expand=1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...

2018-07-03 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21690
  
@yuanboliu From reading KafkaConsumer code, and from testing, I don't see 
where consumer.position() alone would un-pause topicpartitions.  See below.  
Can you give a counter-example?

I am seeing poll() reset the paused state.  When you are having the 
problem, are you seeing the info level log messages "poll(0) returned messages"?

If that's what's happening, I think the best we can do is call pause() in 
only one place, the first line of paranoidPoll, e.g.

`c.pause(c.assignment)
val msgs = c.poll(0)
`



Here's what I saw in testing:

`scala> c.paused
res34: java.util.Set[org.apache.kafka.common.TopicPartition] = []

scala> c.assignment
res35: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.pause(topics)

scala> c.paused
res37: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.position(tp)
res38: Long = 248

scala> c.paused
res39: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.poll(0)
res40: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = 
org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe

scala> c.paused
res41: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.position(tp)
res42: Long = 248

scala> c.paused
res43: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.poll(1)
res44: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = 
org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe

scala> c.paused
res45: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.poll(100)
res46: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = 
org.apache.kafka.clients.consumer.ConsumerRecords@28e4439b

scala> c.paused
res47: java.util.Set[org.apache.kafka.common.TopicPartition] = []
`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...

2018-07-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21690
  
@yuanboliu Can you clarify why repeated pause is necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...

2018-07-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21690
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21651: [SPARK-18258] Sink need access to offset representation

2018-06-28 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21651
  
- We need agreement on whether it is worth making a change to the public 
Sink api (probably not any time soon, judging from the spark 3.0 vs 2.4 
discussion), or whether there is a different way to accomplish the goal.

- I wouldn't worry about what any particular sink implementation does with 
the offsets, most of the existing ones shouldn't do anything by default.  You 
just need a proof of concept that a given sink (e.g. a database sink) can do 
something useful with them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...

2018-06-10 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16006
  
#19431 was merged, thanks for your work.  This PR should probably be closed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...

2018-05-22 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/20997
  
I'm fine as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-...

2018-05-14 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21300
  
@gatorsmile this is identical to the original PR which was reviewed by 
@srowen and discussion on the jira to backport it had not raised any objections 
since April


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] All...

2018-05-12 Thread koeninger
Github user koeninger closed the pull request at:

https://github.com/apache/spark/pull/21300


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-...

2018-05-11 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21300
  
Merging to branch-2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] All...

2018-05-11 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/21300

[SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-consecutive offsets

## What changes were proposed in this pull request?

Backport of the bugfix in SPARK-17147

Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to 
allow streaming jobs to proceed on compacted topics (or other situations 
involving gaps between offsets in the log).

## How was this patch tested?

Added new unit test

justinrmiller has been testing this branch in production for a few weeks



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/daten-kieker/spark branch-2.3_kafkafix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21300


commit 84648e0222330fd3bc43ce214689b66795efdffe
Author: cody koeninger <cody@...>
Date:   2018-02-27T14:21:11Z

[SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets

## What changes were proposed in this pull request?

Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to 
allow streaming jobs to proceed on compacted topics (or other situations 
involving gaps between offsets in the log).

## How was this patch tested?

Added new unit test

justinrmiller has been testing this branch in production for a few weeks

Author: cody koeninger <c...@koeninger.org>

Closes #20572 from koeninger/SPARK-17147.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19183: [SPARK-21960][Streaming] Spark Streaming Dynamic Allocat...

2018-05-10 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19183
  
@sansagara sounds reasonable to me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19183: [SPARK-21960][Streaming] Spark Streaming Dynamic Allocat...

2018-05-09 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19183
  
I don't have personal experience with streaming dynamic allocation, but 
this patch makes sense to me and I don't see anything obviously wrong.

I agree with Holden regarding tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184210716
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)

[GitHub] spark issue #19887: [SPARK-21168] KafkaRDD should always set kafka clientId.

2018-04-23 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19887
  
Merging to master, thanks @liu-zhaokun 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19887: [SPARK-21168] KafkaRDD should always set kafka clientId.

2018-04-23 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19887
  
Seems ok to me, long as it passes retest


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19887: [SPARK-21168] KafkaRDD should always set kafka clientId.

2018-04-23 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19887
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-17 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21038
  
Seems like that should help address the confusion.  Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...

2018-04-17 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/20997
  
I think if we can't come up with a pool design now that solves most of the
issues, we should switch back to the one cached consumer approach that the
SQL code is using.

On Mon, Apr 16, 2018 at 3:25 AM, Gabor Somogyi <notificati...@github.com>
wrote:

> *@gaborgsomogyi* commented on this pull request.
> --
>
> In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
> KafkaDataConsumer.scala
> <https://github.com/apache/spark/pull/20997#discussion_r181655790>:
>
> > +   * If matching consumer doesn't already exist, will be created using 
kafkaParams.
> +   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
> +   *
> +   * Note: This method guarantees that the consumer returned is not 
currently in use by anyone
> +   * else. Within this guarantee, this method will make a best effort 
attempt to re-use consumers by
> +   * caching them and tracking when they are in use.
> +   */
> +  def acquire[K, V](
> +  groupId: String,
> +  topicPartition: TopicPartition,
> +  kafkaParams: ju.Map[String, Object],
> +  context: TaskContext,
> +  useCache: Boolean): KafkaDataConsumer[K, V] = synchronized {
> +val key = new CacheKey(groupId, topicPartition)
> +val existingInternalConsumers = Option(cache.get(key))
> +  .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]])
>
> That's correct, the SQL part isn't keeping a linked list pool but a single
> cached consumer. I was considering your suggestion and came to the same
> conclusion:
>
> Can you clarify why you want to allow only 1 cached consumer per 
topicpartition, closing any others at task end?
>
> It seems like opening and closing consumers would be less efficient than 
allowing a pool of more than one consumer per topicpartition.
>
> Though limiting the number of cached consumers per groupId/TopicPartition
> is a must as you've pointed out. On the other side if we go the SQL way
> it's definitely less risky. Do you think we should switch back to the one
> cached consumer approach?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/20997#discussion_r181655790>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGABzOM08a0IoWTJWOi204fvKoyXc6xks5tpFWDgaJpZM4TKDOs>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181507520
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181506863
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181506582
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue

2018-04-11 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21038
  
I can't think of a valid reason to create a configuration to allow it.  It
just fundamentally doesn't make sense to run different apps with the same
group id.

Trying to catch and rethrow the exception with more information might make
sense.

On Wed, Apr 11, 2018, 20:05 Saisai Shao <notificati...@github.com> wrote:

> Thanks @koeninger <https://github.com/koeninger> for your comments. I
> think your suggestion is valid, the log here is just pasted from JIRA, but
> we also got the same issue from customer's report.
>
> Here in the PR description, I mentioned that using two apps with same
> group id to mimic this issue. But I'm not sure the real use case from our
> customer, maybe in their scenario such usage is valid.
>
> So I'm wondering if we can add a configuration to control whether it
> should be fail or just warning. Also I think exception/warning log should
> be improved to directly tell user about consumer rebalance issue, rather
> than throwing from Kafka as "no current assignment for partition xxx".
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21038#issuecomment-380641782>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB-6ISk53Qsrh0Hwopdc8uk-F4ZFrks5tnqhEgaJpZM4TPftQ>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue

2018-04-11 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21038
  
The log in the jira looks like it's from a consumer rebalance, i.e. more 
than one driver consumer was running with the same group id.

Isn't the underlying problem here that the user is creating multiple 
streams with the same group id, despite what the documentation says?  The log 
even says s/he copy-pasted the documentation group id "group 
use_a_separate_group_id_for_each_stream"

I don't think we should silently "fix" that.  As a user, I wouldn't expect 
app A to suddenly start processing only half of the partitions just because 
entirely different app B started with the (misconfigured) same group id.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180282891
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
--- End diff --

This is mostly vestigial (there used to be a remove method that took a 
groupId, but no kafkaParams, so there was symmetry).  

I don't see a reason it can't be changed to match the SQL version at this 
point, i.e. assign groupId from the params.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180283864
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180284812
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180280531
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
--- End diff --

This is an example of the cut & paste I was referring to.

In this case, I don't believe consumer is ever reassigned, so it doesn't 
even need to be a var.

It was reassigned in the SQL version of the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180285599
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180280210
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
+KafkaDataConsumer.init(16, 64, 0.75f)
+
+val topic = "topic" + Random.nextInt()
+val data = (1 to 1000).map(_.toString)
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic)
+testUtils.sendMessages(topic, data.toArray)
+
+val groupId = "groupId"
+val kafkaParams = Map[String, Object](
+  GROUP_ID_CONFIG -> groupId,
+  BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+  KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  AUTO_OFFSET_RESET_CONFIG -> "earliest",
+  ENABLE_AUTO_COMMIT_CONFIG -> "false"
+)
+
+val numThreads = 100
+val numConsumerUsages = 500
+
+@volatile var error: Throwable = null
+
+def consume(i: Int): Unit = {
+  val useCache = Random.nextBoolean
+  val taskContext = if (Random.nextBoolean) {
+new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), 
null, null, null)
+  } else {
+null
+  }
+  val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
+  try {
+val rcvd = 0 until data.length map { offset =>
+  val bytes = consumer.get(offset, 1).value()
+  new String(bytes)
+}
+assert(rcvd == data)
+  } catch {
+case e: Throwable =>
+  error = e
+  throw e
+  } finally {
+consumer.release()
+  }
+}
+
+val threadPool = Executors.newFixedThreadPool(numThreads)
+try {
+  val futures = (1 to numConsumerUsages).map { i =>
+threadPool.submit(new Runnable {
+  override def run(): Unit = { consume(i) }
+})
+  }
+  futures.foreach(_.get(1, TimeUnit.MINUTES))
+  assert(error == null)
+} finally {
+  threadPool.shutdown()
+}
+  }
+}
--- End diff --

If this PR is intended to fix a problem with silent reading of incorrect 
data, can you add a test reproducing that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180283733
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
   

[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...

2018-04-09 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/20997
  
In general, 2 things about this make me uncomfortable:

- It's basically a cut-and-paste of the SQL equivalent PR, 
https://github.com/apache/spark/pull/20767, but it is different from both that 
PR and the existing DStream code.

- I don't see an upper bound on the number of consumers per key, nor a way 
of reaping idle consumers.  If the SQL equivalent code is likely to be modified 
to use pooling of some kind, seems better to make a consistent decision.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backp...

2018-03-21 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19431
  
Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-03-21 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19431
  
@akonopko Thanks! 
Sorry, but I just noticed the title of the PR - can you adjust it to match 
convention, e.g.

[SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backpressure.initialRate 
to direct Kafka streams

and then I'll get it merged ;)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-03-21 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19431
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...

2018-03-16 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17774
  
merged to master
Thanks @arzt !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-03-16 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19431
  
@akonopko thanks for this, if you can resolve merge conflict I think we can 
get this in


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-03-14 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19431
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...

2018-03-10 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17774
  
LGTM
@tdas @zsxwing absent any objections from you in the next couple of days, 
I'll merge this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-03-10 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19431
  
@tdas any concerns?

If @omuravskiy doesn't express any objections (since these tests are 
basically taken directly from his linked PR) in the next couple of days, I'm 
inclined to merge this.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-03-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r173641331
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
--- End diff --

Aren't the descriptions of these tests backwards, i.e. this the one testing 
that maxRatePerPartition is honored?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-10 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/20767
  
Can you clarify why you want to allow only 1 cached consumer per 
topicpartition, closing any others at task end?

It seems like opening and closing consumers would be less efficient than 
allowing a pool of more than one consumer per topicpartition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...

2018-03-05 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16006
  
@omuravskiy can you comment on
https://github.com/apache/spark/pull/19431
since it appears to be based on your PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...

2018-03-05 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16006
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170799163
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
+dir.mkdirs()
+val logProps = new ju.Properties()
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: 
java.lang.Float)
--- End diff --

Yeah, it's necessary, otherwise it gets treated as AnyRef.  Changed to 
Float.valueOf FWIW


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169850605
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
--- End diff --

Agreed that require is better, will fix it in a sec

Pretty sure assert is just a function in Predef.scala that throws an 
AssertionError, it's not like a java assert statement that can be en / disabled 
with java -ea / -da.  Tested it out:

https://gist.github.com/koeninger/6155cd94a19d1a6373ba0b40039e97e3

Disabling scala asserts can be done at compile time with 
-Xdisable-assertions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169538019
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
 
   // TODO if the buffer was kept around as a random-access structure,
   // could possibly optimize re-calculating of an RDD in the same batch
-  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
--- End diff --

Agreed, think it should be ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169537541
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
--- End diff --

That's a "shouldn't happen unless the topicpartition or broker is gone" 
kind of thing.  Semantically I could see that being more like require than 
assert, but don't have a strong opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   >