[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169536036
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -22,12 +22,17 @@ import java.{ util => ju }
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import kafka.common.TopicAndPartition
--- End diff --

Right, LogCleaner hadn't yet been moved to the new apis, added a comment to 
that effect.
Think we're ok here because it's just being used to mock up a compacted 
topic, not in the actual dstream api.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

2018-02-20 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/20572
  
@srowen Sorry to turn on the bat-signal, but would you be able to help find 
a committer willing to look at this?  

After finally finding someone willing to test in production, don't want 
this to fall through the cracks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

2018-02-12 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/20572
  
@zsxwing you have time to review this?  It's been a long standing issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-10 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/20572

[SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets

## What changes were proposed in this pull request?

Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to 
allow streaming jobs to proceed on compacted topics (or other situations 
involving gaps between offsets in the log).

## How was this patch tested?

Added new unit test

@justinrmiller has been testing this branch in production for a few weeks

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-17147

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20572


commit 3082de7e43e8c381dc2227005d1e0fc5bd2c3d29
Author: cody koeninger <cody@...>
Date:   2016-10-08T21:21:48Z

[SPARK-17147][STREAMING][KAFKA] failing test for compacted topics

commit e8ea89ea10527c6723df4af2685004ea67d872cd
Author: cody koeninger <cody@...>
Date:   2016-10-09T04:59:39Z

[SPARK-17147][STREAMING][KAFKA] test passing for compacted topics

commit 182943e36f596d0cb5841a9c63471bea1dd9047b
Author: cody koeninger <cody@...>
Date:   2018-02-11T04:09:38Z

spark.streaming.kafka.allowNonConsecutiveOffsets

commit 89f4bc5f4de78cdcc22b5c9b26a27ee9263048c8
Author: cody koeninger <cody@...>
Date:   2018-02-11T04:13:49Z

[SPARK-17147][STREAMING][KAFKA] remove stray param doc

commit 12e65bedddbcd2407598e69fa3c6fcbcdfc67e5d
Author: cody koeninger <cody@...>
Date:   2018-02-11T04:28:22Z

[SPARK-17147][STREAMING][KAFKA] prepare for merge of master

commit 2ed51f1f73ee75ffd08355265a72e68e83ef592d
Author: cody koeninger <cody@...>
Date:   2018-02-11T05:19:31Z

Merge branch 'master' into SPARK-17147




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-24 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
Seems reasonable to me but you should probably ask zsxwing if it fits in
with plans for the structured streaming kafka code.

On Thu, Nov 23, 2017 at 10:23 PM, Hyukjin Kwon <notificati...@github.com>
wrote:

> *@HyukjinKwon* commented on this pull request.
>
> Does this look good to you @koeninger <https://github.com/koeninger>?
> --
>
> In external/kafka-0-10/src/main/scala/org/apache/spark/
> streaming/kafka010/KafkaRDD.scala
> <https://github.com/apache/spark/pull/19789#discussion_r152895550>:
>
> > @@ -211,8 +211,8 @@ private[spark] class KafkaRDD[K, V](
>  var requestOffset = part.fromOffset
>
>  def closeIfNeeded(): Unit = {
> -  if (!useConsumerCache && consumer != null) {
> -consumer.close
> +  if (consumer != null) {
> +  consumer.close()
>
> I think this should be double spaced
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/19789#pullrequestreview-78827551>,
> or mute the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB7EEFdiDoEY3ov2wmiDWrnCZN4gqks5s5kTLgaJpZM4QklrV>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-23 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
You'll need to get a commiter's attention to merge it anyway

On Nov 23, 2017 01:48, "Daroo" <notificati...@github.com> wrote:

It seems that your "magic spell" didn't work. No build was triggered

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<https://github.com/apache/spark/pull/19789#issuecomment-346548898>, or mute
the thread

<https://github.com/notifications/unsubscribe-auth/AAGAB4Atem0hApHVaY_o1F8ksWNy6u0Lks5s5SNYgaJpZM4QklrV>
.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-22 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
ok to test

On Wed, Nov 22, 2017 at 2:49 PM, Daroo <notificati...@github.com> wrote:

> Cool. Could you please authorize it for testing?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/19789#issuecomment-346469044>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB8S6e62SscDmZ6tM3eTVPlHTj6nQks5s5IjkgaJpZM4QklrV>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-22 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
Seems reasonable.

On Wed, Nov 22, 2017 at 1:52 PM, Daroo <notificati...@github.com> wrote:

> It fails on the current master branch and doesn't after the patch
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/19789#issuecomment-346456558>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB_y8Z7XRTNv08wpbZh3-UN2HEwA2ks5s5HuFgaJpZM4QklrV>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-22 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
What are you actually asserting in that test and/or does it reliably fail
if run on the version of your code before the patch?

On Wed, Nov 22, 2017 at 1:33 PM, Daroo <notificati...@github.com> wrote:

> I've added a test. @koeninger <https://github.com/koeninger> is it
> something you had in mind?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/19789#issuecomment-346452116>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB_Sz5wdH2nf4eM0z-DMwAInQD0MQks5s5HclgaJpZM4QklrV>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-20 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
Yeah, subscribepattern could definitely be an issue.

As far as unit testing, have you tried anything along the lines of setting
the cache size artificially low and then introducing new topicpartitions?

On Mon, Nov 20, 2017 at 11:43 AM, Daroo <notificati...@github.com> wrote:

> I see you point, but it's kind of difficult to size it properly when you
> use SubscribePattern (i.e. dynamic number of topics/partitions) consumer
> strategy
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/19789#issuecomment-345772062>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGABxwDwzBPfH-VH8BoU8g3BjyMj_JQks5s4bpYgaJpZM4QklrV>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-20 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19789
  
My main comment is that if you have a situation where there's actually 
contention on the size of the cache, chances are things are going to be screwed 
up anyway due to consumers being recreated and losing any benefit of buffering.

Are you running into this issue in situations where the cache is 
appropriately sized given the number of topicpartitions and executors?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsa...

2017-11-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/19789#discussion_r152056775
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -155,11 +178,11 @@ object CachedKafkaConsumer extends Logging {
 logInfo(s"Cache miss for $k")
 logDebug(cache.keySet.toString)
 val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
-cache.put(k, c)
+cache.put(k, c.acquireAndGet())
 c
   } else {
 // any given topicpartition should have a consistent key and value 
type
-v.asInstanceOf[CachedKafkaConsumer[K, V]]
+v.acquireAndGet().asInstanceOf[CachedKafkaConsumer[K, V]]
--- End diff --

Shouldn't this method call be after the cast?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19274: [SPARK-22056][Streaming] Add subconcurrency for KafkaRDD...

2017-09-27 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19274
  
Search Jira and the mailing list, this idea has been brought up multiple 
times.  I don't think breaking fundamental assumptions of Kafka (one consumer 
thread per group per partition) is a good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...

2017-09-11 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19134
  
There's already a jira about why 0.10 doesn't have python support, 
https://issues-test.apache.org/jira/browse/SPARK-16534


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...

2017-09-05 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19134
  
I think it makes sense to go ahead and put deprecation warnings in this PR 
as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStreams to ...

2017-08-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/11863
  
You won't get any reasonable semantics out of auto commit, because it will
commit on the driver without regard to what the executors have done.

On Aug 2, 2017 21:46, "Wallace Huang" <notificati...@github.com> wrote:

> hey,I have an question about the setting "auto.commit.enable", It 
could be
> changed Because I wanna save the offsets information to zookeeper
> cluster.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/11863#issuecomment-319852845>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAByVZ2QEf_o627Z7BKdRBuCtmza-Nks5sUTSTgaJpZM4H1Pg1>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18353: [SPARK-21142][SS] spark-streaming-kafka-0-10 should depe...

2017-06-23 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/18353
  
I thought the historical reason testutils was in main rather than test was 
for ease of use by other tests (e.g. python).   There's even still a comment in 
the code to that effect.

That may not matter at this point given python support for 0.10 never went 
anywhere, and the outlook on dstreams in general, but just bringing it up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18234: [SPARK-19185][DSTREAM] Make Kafka consumer cache configu...

2017-06-07 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/18234
  
LGTM, thanks Mark


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18234: [SPARK-19185][DSTREAM] Make Kafka consumer cache ...

2017-06-07 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18234#discussion_r120716157
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -91,7 +91,7 @@ The new Kafka consumer API will pre-fetch messages into 
buffers.  Therefore it i
 
 In most cases, you should use `LocationStrategies.PreferConsistent` as 
shown above.  This will distribute partitions evenly across available 
executors.  If your executors are on the same hosts as your Kafka brokers, use 
`PreferBrokers`, which will prefer to schedule partitions on the Kafka leader 
for that partition.  Finally, if you have a significant skew in load among 
partitions, use `PreferFixed`. This allows you to specify an explicit mapping 
of partitions to hosts (any unspecified partitions will use a consistent 
location).
 
-The cache for consumers has a default maximum size of 64.  If you expect 
to be handling more than (64 * number of executors) Kafka partitions, you can 
change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`
+The cache for consumers has a default maximum size of 64.  If you expect 
to be handling more than (64 * number of executors) Kafka partitions, you can 
change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`. If 
you would like to disable the caching for Kafka consumers, you can set 
`spark.streaming.kafka.consumer.cache.enabled` to `false`.
--- End diff --

Code change LGTM.

I'd prefer clarifying / adding caveats to the documentation, rather than 
leaving it undocumented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

2017-06-01 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/18143
  
Sorry, noticed a couple more minor configuration related things.  Otherwise 
LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119636878
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
+if (cache == null) {
+  val duration =
+
SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", 
"30m")
--- End diff --

All of the other configs in the dstream are of the form 
"spark.streaming.kafka.consumer.cache.something", not sure whether it's better 
to be consistent with the sql config, or with the dstream config


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119636950
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
--- End diff --

Load factor is being ignored at this point, yes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119636360
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
+if (cache == null) {
+  val duration =
+
SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", 
"30m")
+  val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer[_, _]] {
+override def onRemoval(
+n: RemovalNotification[CacheKey, CachedKafkaConsumer[_, _]]): 
Unit = {
+  n.getCause match {
+case RemovalCause.SIZE =>
+  logWarning(
+s"Evicting consumer ${n.getKey}," +
+  s" due to size limit reached. Capacity: $maxCapacity.")
+case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}")
+  }
+  try {
+n.getValue.close()
+  } catch {
+case NonFatal(e) =>
+  logWarning(s"Error in closing Kafka consumer: ${n.getKey}," +
+s" evicted from cache due to ${n.getCause}", e)
   }
 }
   }
+
+  cache = CacheBuilder.newBuilder()
+.maximumSize(maxCapacity).removalListener(removalListener)
--- End diff --

initialCapacity is being passed in, but not used, this should probably set 
initial capacity as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119374298
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -147,20 +155,14 @@ object CachedKafkaConsumer extends Logging {
   groupId: String,
   topic: String,
   partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
-CachedKafkaConsumer.synchronized {
+  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = {
   val k = CacheKey(groupId, topic, partition)
-  val v = cache.get(k)
-  if (null == v) {
-logInfo(s"Cache miss for $k")
-logDebug(cache.keySet.toString)
-val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
-cache.put(k, c)
-c
-  } else {
-// any given topicpartition should have a consistent key and value 
type
-v.asInstanceOf[CachedKafkaConsumer[K, V]]
-  }
+  val v = cache.get(k, new Callable[CachedKafkaConsumer[_, _]] {
+override def call(): CachedKafkaConsumer[K, V] = {
+  new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
--- End diff --

I think it's worth keeping the info / debug level logging for when a cache 
miss has happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119371920
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
-  }
+val duration = 
SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer[_, _]] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer[_, _]]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
+  s"Evicting consumer ${n.getKey}, due to size limit reached. 
Capacity: $maxCapacity.")
+  case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}")
+}
+try {
+  n.getValue.close()
+} catch {
+  case NonFatal(e) =>
+logWarning(s"Error in closing Kafka consumer: ${n.getKey}," +
+  s" evicted from cache due to ${n.getCause}", e)
 }
   }
 }
+if (cache == null) {
--- End diff --

Shouldn't the whole function be guarded by this if statement?  Is there any 
reason to construct a removal listener otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119371285
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
-  }
+val duration = 
SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m")
--- End diff --

I don't think this should be using configuration from the spark.sql 
namespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

2017-05-30 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/18143
  
Generally looks ok to me, thanks.
2 questions -

Did you do any testing on workloads to see if performance stayed the same?

Is there a reason not to do the same thing to 
streaming/kafka010/CachedKafkaConsumer.scala for consistency?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119132090
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
--- End diff --

It seems like the general trend towards default configuration values is to 
make them work for the lowest common denominator use case, in which case I'd 
argue for a longer (30 min?) default timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119131006
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
--- End diff --

It shouldn't really be a normal operation.  If capacity is smaller than the 
number of partitions that are regularly being assigned to a given node, it's 
going to kill performance due to recreating consumers every batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

2017-05-04 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17308
  
@marmbrus @zsxwing @tdas This needs attention from someone


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...

2017-04-27 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17774
  
LGTM pending jason's comments on tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...

2017-04-27 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17774
  
Have you read the function def clamp?

Rate limit of 1 should not imply an attempt to grab 1 message even if it
doesn't exist.

On Apr 27, 2017 11:01, "Sebastian Arzt" <notificati...@github.com> wrote:

> @koeninger <https://github.com/koeninger> I agree that assuming a long
> batch size is wrong, not sure whether it even matters.
> But what if for one partition there is no lack in the current batch? Then
> fetching 1 message for this partition from kafka, is you suggest, would
> fail. So here zero makes sense in my eyes. This is also the old behaviour
> if rate > 1 and lag == 0 here
> 
<https://github.com/apache/spark/blob/master/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala#L107>
> .
> Further, I think that truncating 0.99 to 0 messages per partition is also
> the right thing to do, as one cannot be sure that there is one message
> available if (secsPerBatch * limit) < 1.0. And as you say, in a future
> batch it is very like to become greater than 1.0.
> Do you agree?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/17774#issuecomment-297758733>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB1X8NCNqECUlx9X54DSAmbnHmHdAks5r0LvggaJpZM4NJAVA>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...

2017-04-27 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17774
  
@arzt It's entirely possible to have batch times less than a second, and 
I'm not sure I agree that the absolute number of messages allowable for a 
partition should ever be zero.

So to put this another way, right now effectiveRateLimitPerPartition is a 
Map[TopicPartition, Long], which matches the return value of the function 
maxMessagesPerPartition.

You're wanting to change effectiveRateLimitPerPartition to a 
Map[TopicPartition, Double], which is probably a good idea, and should fix the 
bug around treating a very small rate limit as no limit.

But it still needs to be converted to Map[TopicPartition, Long] before 
returning.  Calling .toLong is probably not the right thing to do there, 
because 0.99 will get truncated to 0.  

I think one message per partition per batch is the minimum reasonable rate 
limit, otherwise particular partitions may not make progress.  The relative lag 
calculation might take care of that in future batches, but it still seems 
questionable, even if it's a corner case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...

2017-04-26 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17774
  
How do you read 0.1 of a kafka message for a given partition of a given 
batch?

Ultimately the floor for a rate limit, assuming one is set, needs to be 1 
message per partition per batch, not a fraction, which is why it's a long.

If you want to delay that conversion by keeping it as a double as long as 
possible, that makes sense, but the lines like

(secsPerBatch * limit).toLong

probably need attention too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17675: [SPARK-20036][DOC] Note incompatible dependencies...

2017-04-18 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/17675

[SPARK-20036][DOC] Note incompatible dependencies on org.apache.kafka 
artifacts


## What changes were proposed in this pull request?

Note that you shouldn't manually add dependencies on org.apache.kafka 
artifacts

## How was this patch tested?

Doc only change, did jekyll build and looked at the page.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-20036

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17675


commit bcba5bac51aea773616fda539a9a1e48604a7815
Author: cody koeninger <c...@koeninger.org>
Date:   2017-04-18T21:16:14Z

[SPARK-20036][DOC] Document that you shouldnt add incompatible dependencies 
on org.apache.kafka artifacts




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

2017-04-12 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/17308
  
Just to throw in my two cents, a change like this is definitely needed, as 
is made clear by the second sentence of the docs


http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

"The producer is thread safe and sharing a single producer instance across 
threads will generally be faster than having multiple instances."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...

2017-03-13 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16006
  
@zsxwing you or anyone else have time to look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...

2017-03-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/16006#discussion_r105720383
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -143,9 +147,16 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
 lagPerPartition.map { case (tp, lag) =>
   val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
-  val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
-  tp -> (if (maxRateLimitPerPartition > 0) {
-Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
+  val effectiveRate = if (rate >= 0) rate else 
backpressureInitialRate
--- End diff --

This can be moved before the map, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...

2017-03-10 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16006
  
Any interest in picking this back up if you can get a committer's attention?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16629: [SPARK-19185][DStream] Add more clear hint for 'Concurre...

2017-01-18 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16629
  
I don't think it's a problem to make disabling the cache configurable, as 
long as it's on by default. I don't think the additional static constructors in 
kafka utils are necessary, are they?

I'm not sure it's a good idea to just blindly recommend people turn it off 
it they get that exception though, it's not that simple. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16569: [SPARK-19206][DOC][DStream] Fix outdated parameter descr...

2017-01-14 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/16569
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...

2016-12-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/16006#discussion_r91118893
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -143,9 +147,14 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
 lagPerPartition.map { case (tp, lag) =>
   val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
-  val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
-  tp -> (if (maxRateLimitPerPartition > 0) {
-Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
+  val effectiveRate = if (rate >= 0) rate else 
backpressureInitialRate
+  val estimateRate = Math.round(lag / totalLag.toFloat * 
effectiveRate)
+  val backpressureRate =
+if (estimateRate > maxRateLimitPerPartition && 
maxRateLimitPerPartition > 0) {
+  maxRateLimitPerPartition
+}
+else estimateRate
--- End diff --

http://spark.apache.org/contributing.html
use braces around the else clause unless it's all on one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...

2016-12-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/16006#discussion_r91118458
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -67,6 +67,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 ekp
   }
 
+  val backpressureInitialRate: Long =
+
_ssc.sparkContext.conf.getLong("spark.streaming.backpressure.initialRate",
+  
_ssc.sparkContext.conf.getDouble("spark.streaming.backpressure.pid.minRate", 
100).round)
--- End diff --

Shouldn't these be using ssc instead of _ssc for consistency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...

2016-12-05 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/16006#discussion_r91001763
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -67,6 +67,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 ekp
   }
 
+  val backpressureInitialRate: Long =
+
_ssc.sparkContext.conf.getLong("spark.streaming.backpressure.initialRate",
+  1 + 
_ssc.sparkContext.conf.getLong("spark.streaming.kafka.maxRatePerPartition", 
1000L) / 10)
+
--- End diff --

Where is this divide by 10 coming from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...

2016-12-05 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/16006#discussion_r91002372
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -143,9 +147,14 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
 lagPerPartition.map { case (tp, lag) =>
   val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
-  val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
-  tp -> (if (maxRateLimitPerPartition > 0) {
-Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
+  val backpressureRate =
+if (rate >= 0) {
+  val estimateRate = Math.round(lag / totalLag.toFloat * rate)
+  if (estimateRate > maxRateLimitPerPartition && 
maxRateLimitPerPartition > 0) {
+maxRateLimitPerPartition
+  } else estimateRate
+} else math.min(backpressureInitialRate, 
maxRateLimitPerPartition)
--- End diff --

I don't think these units are comparable, one is records / sec, and the 
other is records / sec / partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false work w...

2016-11-19 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15820
  
Because the comment made by me and +1'ed by marmbrus is hidden at this 
point, I just want to re-iterate that this patch should not skip the rest of 
the partition in the case that a timeout happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88360922
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+null
+  } else {
+// offset < beginningOffset <= untilOffset - 1
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset)")
+getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs)
+  }
+}
+  }
+
+  /**
+   * Get the earliest record in [offset, untilOffset) from the fetched 
data. If there is no

[GitHub] spark issue #15849: [SPARK-18410][STREAMING] Add structured kafka example

2016-11-15 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15849
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15849: [SPARK-18410][STREAMING] Add structured kafka exa...

2016-11-14 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15849#discussion_r87795091
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: JavaStructuredKafkaWordCount  
 
+ *The Kafka "bootstrap.servers" configuration. A
+ *   comma-separated list of host:port.
+ *There are three kinds of type, i.e. 'assign', 
'subscribe',
+ *   'subscribePattern'.
+ *   |-  Specific TopicPartitions to consume. Json string
+ *   |  {"topicA":[0,1],"topicB":[2,4]}.
+ *   |-  The topic list to subscribe. A comma-separated list of
+ *   |  topics.
+ *   |-  The pattern used to subscribe to topic(s).
+ *   |  Only one of "assign, "subscribe" or "subscribePattern" options can 
be
+ *   |  specified for Kafka source.
+ *A list of one or more kafka topics to consume from. Please 
refer
--- End diff --

This isn't really a list of topics for assign or subscribePattern. I'd say 
use a more general description, or stick to just subscribe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87439798
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+null
+  } else {
+// offset < beginningOffset <= untilOffset - 1
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset)")
+getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs)
+  }
+}
+  }
+
+  /**
+   * Get the earliest record in [offset, untilOffset) from the fetched 
data. If there is no

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87438222
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

I'm clearer on why this terminates, but I think it's worth a comment, since 
it's a mutually recursive call without changing arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87438788
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
--- End diff --

I think it's worth the warning explicitly stating that data has been lost


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false work w...

2016-11-08 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15820
  
Wow, looks like the new github comment interface did all kinds of weird 
things, apologies about that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129126
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

- Why is this not an early return?
- The arguments to the recursive function have not changed at this point, 
right?  Why does it terminate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129981
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
--- End diff --

Can this var be eliminated by just using a single try around the if / else? 
 It's the same catch condition in either case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129817
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
--- End diff --

I don't think it's necessary to seek every time the fetched data is empty, 
in normal operation the poll should return the next offset, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129927
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+}
+  } else {
+// Case 1 or 7
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset})")
+return getAndIgnoreLostData(beginningOffset, untilOffset, 
pollTimeoutMs)
+  }
+} else {
+  if (!fetchedData.hasNext()) {
+// We cannot fetch anything after `polling`. Two possible cases:
+// - `beginningOffset` is `offset` but there is nothing for 
`beginningOffset` right now.
+// - Cannot fetch any date before timeout.
+// Because there is no way to distinguish, just skip the rest 
offsets in the current
+// partition.
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  }
+
+  val record = fetchedData.next()
+  if (record.offset >= untilOffset) {
+// Case 2
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87127811
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+}
+  } else {
+// Case 1 or 7
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset})")
+return getAndIgnoreLostData(beginningOffset, untilOffset, 
pollTimeoutMs)
+  }
+} else {
+  if (!fetchedData.hasNext()) {
+// We cannot fetch anything after `polling`. Two possible cases:
+// - `beginningOffset` is `offset` but there is nothing for 
`beginningOffset` right now.
+// - Cannot fetch any date before timeout.
+// Because there is no way to distinguish, just skip the rest 
offsets in the current
+// partition.
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  }
+
+  val record = fetchedData.next()
+  if (record.offset >= untilOffset) {
+// Case 2
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87130059
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

- Why isn't this an early return?
- Unless I'm misreading, this is a recursive call without changing the 
arguments.  Why is it guaranteed to terminate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87128373
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+}
+  } else {
+// Case 1 or 7
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset})")
+return getAndIgnoreLostData(beginningOffset, untilOffset, 
pollTimeoutMs)
+  }
+} else {
+  if (!fetchedData.hasNext()) {
+// We cannot fetch anything after `polling`. Two possible cases:
+// - `beginningOffset` is `offset` but there is nothing for 
`beginningOffset` right now.
+// - Cannot fetch any date before timeout.
+// Because there is no way to distinguish, just skip the rest 
offsets in the current
+// partition.
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  }
+
+  val record = fetchedData.next()
+  if (record.offset >= untilOffset) {
+// Case 2
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129204
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
--- End diff --

Can't this var be eliminated with a singly try around the following if/else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...

2016-11-08 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15132
  
@zsxwing @rxin the per-partition rate limit probably won't overflow, but 
the overall backpressure rate limit was being cast to an int, which definitely 
can overflow.  I changed it in this latest commit, if you think that should be 
a different PR instead let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-07 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15132#discussion_r86810750
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for user-supplied configurations that can't otherwise be set 
via Spark properties,
+ * because they need tweaking on a per-partition basis,
+ */
+@Experimental
+abstract class PerPartitionConfig() extends Serializable {
+  /**
+   *  Maximum rate (number of records per second) at which data will be 
read
+   *  from each Kafka partition.
+   */
+  def maxRatePerPartition(topicPartition: TopicPartition): Int
--- End diff --

That's a reasonable question.  2 billion messages per second on a single 
thread would be pretty extreme, but I don't think it hurts us to change this to 
a long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...

2016-11-04 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15132
  
@rxin thanks, changed to abstract class.  If you think that's sufficient 
future proofing I otherwise think this is a worthwhile change, seems like it 
meets a real user need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...

2016-11-04 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15132
  
I sent a message to d...@spark.apache.org, if you're not already subscribed 
I'd say subscribe and follow up in case there's any discussion there rather 
than on the pr / jira.  Someone may want to know more about your use case, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...

2016-11-03 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15132
  
Ok... so the question at this point is whether it's worth making the API 
change, which ultimately we'll have to track down a committer to decide.

As the PR stands, it shouldn't break binary compatibility because the class 
constructor it's changing is private.  But future changes to PerPartitionConfig 
would.  Kind of a question of whether it's worth coming up with a generic 
additional option scheme now, or possibly making breaking changes to an api 
(granted, one that's labeled experimental) later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout

2016-11-03 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15737
  
Good catch, I just mistakenly changed to AsMS in one place but not both.

On the test changes, do you want tests waiting up to 2 minutes * however 
many kafka calls are being made?  If so I can rip everything else out, just let 
me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15737: [SPARK-18212][SS][KAFKA] increase executor poll t...

2016-11-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15737#discussion_r86254376
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
 
   private val sc = sqlContext.sparkContext
 
-  private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+(sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 
1000).toString
--- End diff --

My thinking was just to make it really obvious it was the same default as 
spark.network.timeout usually has, since I didn't find a standalone constant 
somewhere for that value.  If you think it's clearer as ms, I don't feel 
strongly either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout

2016-11-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15737
  
Ok, I'll update the default in both places to use spark.network.timeout and 
leave the test config at 10 seconds


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout

2016-11-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15737
  
I will always choose "fail in an obvious way that I can start debugging"
versus "start behaving poorly in non-obvious ways".  Similar reason I
thought it was a really poor idea for the original PRs for structured
streaming kafka to silently swallow offset out of range:  something is
wrong, and I want to know about it.

Ultimately this particular thing is tunable, so if you really want to
increase the default to however many seconds let me know - it won't affect
my ability to do my job.

On Wed, Nov 2, 2016 at 1:42 PM, Michael Armbrust <notificati...@github.com>
wrote:

> If we have prefetched the timeout should never kick in (I hope?), so we
> probably only care about the network here. If my choices are "fail the 
job"
> or "wait a little bit longer", I don't see why you would ever chose "fail
> the job" for transient network issues. If its not a transient issue, do 
you
> really think that knowing about it 9.5 seconds earlier outweighs that cost
> of false negatives?
>
> I guess I don't understand what kind of Kafka issues you are optimizing
> for?
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/15737#issuecomment-257960644>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB9_f5Jaae1inqEK4uTw45cgPdQ1Hks5q6NmfgaJpZM4KnW2M>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout

2016-11-02 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15737
  
In most cases poll should be returning prefetched data from the buffer, not
waiting to talk to kafka over the network.

I could see increasing it a little bit, but I don't think it should be
increased to 10 seconds by default.  That's a really long time for a 1
second batch streaming job to silently wait, and the only thing you'll be
able to observe is that your job starts running slow for some unknown
reason.  At least with a smaller default any network or Kafka issues will
be obvious, and you can either diagnose them or tune upwards.


On Wed, Nov 2, 2016 at 1:06 PM, Michael Armbrust <notificati...@github.com>
wrote:

> Is there are reason to not change the user facing default? Its failing our
> tests and it was failing for me for real with the current default. When we
> are poling on the executors, the offsets should always be available, so I
> don't think increasing the timeout will hurt anything.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/15737#issuecomment-257949815>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB6lwUbgWwgpfkaOgyimftB_nYvpUks5q6NFCgaJpZM4KnW2M>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86203226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

Yes, I think it's a good idea to explicitly say for each output mode 
whether watermarks affect emit and evict.  Just so I'm clear, the intention is

Append: affects emit, affects evict
Update: doesn't affect emit, affects evict
Complete: doesn't affect emit, no eviction

Is that right?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15737: [SPARK-18212][SS][KAFKA] increase executor poll t...

2016-11-02 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/15737

[SPARK-18212][SS][KAFKA] increase executor poll timeout

## What changes were proposed in this pull request?

Increase poll timeout to try and address flaky test

## How was this patch tested?

Ran existing unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-18212

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15737


commit a7f78d2996672a335f29acafe2ba2350541d6ba2
Author: cody koeninger <c...@koeninger.org>
Date:   2016-11-02T15:12:28Z

[SPARK-18212][SS][KAFKA] increase executor poll timeout




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15702: [SPARK-18124] Observed delay based Event Time Watermarks

2016-11-01 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15702
  
Given the concerns Ofir raised about a single far future event screwing up 
monotonic event time, do you want to document that problem even if there isn't 
an enforced filter for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86066774
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
+   *
+   *  The current event time is computed by looking at the 
`MAX(eventTime)` seen in an epoch across
+   *  all of the partitions in the query minus a user specified 
`delayThreshold`.  Due to the cost
+   *  of coordinating this value across partitions, the actual watermark 
used is only guaranteed
+   *  to be at least `delayThreshold` behind the actual event time.  In 
some cases we may still
+   *  process records that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime the name of the column that contains the event time 
of the row.
+   * @param delayThreshold the minimum delay to wait to data to arrive 
late, relative to the latest
+   *   record that has been processed in the form of 
an interval
+   *   (e.g. "1 minute" or "5 hours").
--- End diff --

Should this make it clear what the minimum useful granularity is (ms)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86067616
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

I'm not clear on why the semantics of Update mean that watermarks shouldn't 
be used to remove state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86066376
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
--- End diff --

For append, this sounds like the intention is emit only once watermark has 
passed, and drop state.
But for other output modes, it's not clear from reading this what the 
effect of the watermark on emission and dropping state is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86066082
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
+   *
+   *  The current event time is computed by looking at the 
`MAX(eventTime)` seen in an epoch across
--- End diff --

- Should this be "The current watermark is computed..." ?
- what is an epoch, it isn't mentioned in the docs or elsewhere in the PR



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15715: [SPARK-18198][Doc][Streaming] Highlight code snippets

2016-11-01 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15715
  
LGTM



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...

2016-11-01 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15681
  
I'm agnostic on the value of adding the overload, if @lw-lin thinks it's 
more convenient for users.  There are considerably fewer overloads as it stands 
than the old 0.8 version of KafkaUtils, so I don't think we're getting too 
messy/crowded yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...

2016-11-01 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15681
  
I don't think there's a reason to deprecate it.  ju.Map is the lowest
common denominator for kafka params, it's used by the underlying consumer,
and it's what the ConsumerStrategy interface expects to return.  I would
definitely prefer to use it in my Scala jobs that were generating a
sequence of RDDs.

On Tue, Nov 1, 2016 at 9:47 AM, Sean Owen <notificati...@github.com> wrote:

> We would want to change the code so that nothing calls the deprecated
> method including tests. I think you can let the deprecated one call the 
new
> one? When to remove it is a question for the future, yes.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/15681#issuecomment-257585401>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAGAB-urGivwIvNU8dmlHNqF11otDBo_ks5q51EdgaJpZM4KkHhg>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...

2016-11-01 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15681
  
LGTM, thanks.

If you want to open a separate PR to cleanup the private doc issues you 
noticed, go for it, shouldn't need another Jira imho if it isn't changing code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...

2016-10-31 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15681
  
I think this should just be another createRDD overload that takes a scala 
map.  The minor additional maintenance overhead of that method as opposed to 
change the existing one isn't worth breaking clients.  I'm all in favor of 
changes to experimental apis when they are necessary, but this isn't necessary.

That also makes most of the other changes in this PR unnecessary - you 
don't need to touch mima, you don't need to refactor to createRddInternal, etc 
etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15681: [Minor][Streaming][Kafka] Kafka010 .createRDD() scala AP...

2016-10-30 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15681
  
Public API changes even on an experimental module aren't a minor thing, 
open a Jira.

It's also better not to lump unrelated changes together.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java...

2016-10-29 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15679
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java...

2016-10-29 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15679
  
Thanks for working on this, couple minor things to fix but otherwise looks 
good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85642029
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")), resultSet.long("offset"));
+   }
+
+   JavaInputDStream<ConsumerRecord<String, String>> stream = 
KafkaUtils.<String, String>createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), 
kafkaParams, fromOffsets)
+   );
+
+   stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, 
String>>>() {
+ @Override
+ public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   
+   Object results = yourCalculation(rdd);
+
+   yourTransactionBlock {
--- End diff --

I agree with Sean, this would probably be clearer if it was changed to a 
comment like
// begin your transaction
...
// end your transaction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85641502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 ---
@@ -36,8 +36,8 @@ class StreamingQueryException private[sql](
 @transient val query: StreamingQuery,
 val message: String,
 val cause: Throwable,
-val startOffset: Option[Offset] = None,
-val endOffset: Option[Offset] = None)
+val startOffset: Option[OffsetSeq] = None,
--- End diff --

Why not expose the actual json string?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java...

2016-10-29 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15679
  
Were these extracted from compiled example projects, or just written up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85641432
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in 
a special Kafka topic.  By
 
 
stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
  // some time later, after outputs have completed
- stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+ stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
 
 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if 
called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.
 
 
+   stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, 
String>>>() {
+ @Override
+ public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+
+   // some time later, after outputs have completed
+   ((CanCommitOffsets) 
stream.inputDStream()).commitAsync(offsetRanges);
--- End diff --

I think it's far too late to fix those issues at this point.  DStreams 
return an RDD, not a parameterized type.  KafkaUtils methods return DStreams 
and RDDs, not an implementation specific type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85253453
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource(
 
--- End diff --

Shouldn't it be set to the highest available offset in the streaming 
metadata log, not the highest available offset in kafka?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-26 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15626
  
So it looks like this is the json format being used for kafka offsets:


[{"_1":{"hash":0,"partition":0,"topic":"t"},"_2":2},{"_1":{"hash":0,"partition":1,"topic":"t"},"_2":2}]

My main concerns are that is unnecessarily verbose, and is completely 
different from the format being used to specify per-topicpartition offsets in 
org.apache.spark.sql.kafka010.JsonUtils

I see no reason why those formats should be different - a user should be 
able to take offsets directly from the checkpoint and use them to start a job, 
for instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...

2016-10-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15504#discussion_r84378170
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.Writer
+
+import scala.collection.mutable.{ ArrayBuffer, HashMap }
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Utilities for converting Kafka related objects to and from json.
+ */
+private object JsonUtils {
--- End diff --

KafkaSourceOffset is a Map[TopicPartition, Long].  This PR has an api for a 
user to provide a Map[TopicPartition, Long].  There's an existing ticket for 
making the metadata log store user-readable json.  This PR uses json.  Are they 
really unrelated?  Why would we want a second, different, json format for the 
same information


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...

2016-10-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15504#discussion_r84374872
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.Writer
+
+import scala.collection.mutable.{ ArrayBuffer, HashMap }
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Utilities for converting Kafka related objects to and from json.
+ */
+private object JsonUtils {
--- End diff --

Sorry, looks like I edited while you were commenting.  I was thinking 
towards SPARK-17829, where this same format might get used more frequently 
throughout a job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...

2016-10-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15504#discussion_r84372134
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -232,6 +232,42 @@ private[kafka010] case class KafkaSource(
   override def toString(): String = s"KafkaSource[$consumerStrategy]"
 
   /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  private def fetchSpecificStartingOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 
Long] =
+withRetriesWithoutInterrupt {
+  // Poll to get the latest assigned partitions
+  consumer.poll(0)
+  val partitions = consumer.assignment()
+  consumer.pause(partitions)
+  assert(partitions.asScala == partitionOffsets.keySet,
+"If startingOffsets contains specific offsets, you must specify 
all TopicPartitions.\n" +
+  "Use -1 for latest, -2 for earliest, if you don't care.\n" +
+  s"Specified: ${partitionOffsets.keySet} Assigned: 
${partitions.asScala}")
+  logDebug(s"Partitions assigned to consumer: $partitions. Seeking to 
$partitionOffsets")
+
+  partitionOffsets.foreach {
+case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp))
+case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp))
+case (tp, off) => consumer.seek(tp, off)
+  }
+  val result = partitionOffsets.map {
+case (tp, _) => tp -> consumer.position(tp)
+  }
+  partitionOffsets.foreach {
+case (tp, off) if off != -1 && off != -2 =>
+  if (result(tp) != off) {
+reportDataLoss(
--- End diff --

good call, will fix that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...

2016-10-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15504#discussion_r84371466
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -317,6 +353,8 @@ private[kafka010] case class KafkaSource(
   try {
 result = Some(body)
   } catch {
+case x: OffsetOutOfRangeException =>
--- End diff --

OffsetOutOfRange exeception can happen on any poll.  I agree that with the 
way things are now, it shouldn't happen, but that has more to do with us 
currently having an explictly enforced value for auto.offset.reset, not with 
whether data is being fetched. This change is basically paranoia about catching 
all nonfatal errors without respecting user settings regarding data loss.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...

2016-10-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15504#discussion_r84370308
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.Writer
+
+import scala.collection.mutable.{ ArrayBuffer, HashMap }
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Utilities for converting Kafka related objects to and from json.
+ */
+private object JsonUtils {
--- End diff --

Regarding json4s, I haven't benchmarked this particular usage, but my 
practical experience has been that json4s is noticeably slower than just using 
jackson directly.  Is this just a code conciseness concern, or is there some 
other reason to prefer json4s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15570: [STREAMING][KAFKA][DOC] clarify kafka settings ne...

2016-10-20 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/15570

[STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches

## What changes were proposed in this pull request?

Minor doc change to mention kafka configuration for larger spark batches.

## How was this patch tested?

Doc change only, confirmed via jekyll.

The configuration issue was discussed / confirmed with users on the mailing 
list.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 kafka-doc-heartbeat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15570


commit a7b0a17edef25d2fe0ad2934072cb5019afff767
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-20T17:37:12Z

[STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-17 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/15527

[SPARK-17813][SQL][KAFKA] Maximum data per trigger

## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on 
volume of different topicpartitions.

This is assuming SPARK-17812 is merged first due to common changes in test 
utils, if that ends up not being the case I can clean this up as a separate 
patch.

## How was this patch tested?

Added unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-17813

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15527


commit c45ded7109474fcb40f03c772192eb38398f328a
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-14T04:23:02Z

[SPARK-17812][SQL][KAFKA] parse json for topicpartitions and offsets

commit 12d3988c4fcef9bbbd88ce69295d2ff3e5baa5ba
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-14T19:58:08Z

Merge branch 'master' into SPARK-17812

commit 3120fd8ade24140777c29fc1487aa3f6e76152fb
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-14T21:37:35Z

[SPARK-17812][SQL][KAFKA] implement specified offsets and assign

commit 35bb8c3cfe77f2cb3d26f4afd3364caa6d0ec4cf
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-16T03:00:20Z

[SPARK-17812][SQL][KAFKA] doc and test updates

commit 2e53e5a3904305cb1d1b0f2325e31c9c434d16ec
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-16T03:16:11Z

[SPARK-17812][SQL][KAFKA] style fixes

commit 5e4511f0c7e84d15011a7eb8d208be13ed672b49
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-16T03:52:39Z

[SPARK-17812][SQL][KAFKA] additional paranoia on reset of starting offsets

commit cae967cb88a7682b6794d5d2ef90a0d9a1d3ea60
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-18T03:14:31Z

Merge branch 'SPARK-17812' into SPARK-17813

Testing maxOffsetsPerTrigger requires the per-partition sendMessages 
testing added in SPARK-17812

commit 6c8d459f9795c6ff32e8bf78f8796869ca722ee3
Author: cody koeninger <c...@koeninger.org>
Date:   2016-10-18T05:20:53Z

[SPARK-17813][SQL][KAFKA] maxOffsetsPerTrigger proportional implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15407: [SPARK-17841][STREAMING][KAFKA] drain commitQueue

2016-10-17 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/15407
  
@rxin @tdas right now, items to be committed can be added to the queue, but 
they will never actually be removed from the queue.  poll() removes, iterator() 
does not.  I updated the description of the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   >