[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169536036 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -22,12 +22,17 @@ import java.{ util => ju } import scala.collection.JavaConverters._ import scala.util.Random +import kafka.common.TopicAndPartition --- End diff -- Right, LogCleaner hadn't yet been moved to the new apis, added a comment to that effect. Think we're ok here because it's just being used to mock up a compacted topic, not in the actual dstream api. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20572 @srowen Sorry to turn on the bat-signal, but would you be able to help find a committer willing to look at this? After finally finding someone willing to test in production, don't want this to fall through the cracks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20572 @zsxwing you have time to review this? It's been a long standing issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/20572 [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets ## What changes were proposed in this pull request? Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log). ## How was this patch tested? Added new unit test @justinrmiller has been testing this branch in production for a few weeks You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-17147 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20572 commit 3082de7e43e8c381dc2227005d1e0fc5bd2c3d29 Author: cody koeninger <cody@...> Date: 2016-10-08T21:21:48Z [SPARK-17147][STREAMING][KAFKA] failing test for compacted topics commit e8ea89ea10527c6723df4af2685004ea67d872cd Author: cody koeninger <cody@...> Date: 2016-10-09T04:59:39Z [SPARK-17147][STREAMING][KAFKA] test passing for compacted topics commit 182943e36f596d0cb5841a9c63471bea1dd9047b Author: cody koeninger <cody@...> Date: 2018-02-11T04:09:38Z spark.streaming.kafka.allowNonConsecutiveOffsets commit 89f4bc5f4de78cdcc22b5c9b26a27ee9263048c8 Author: cody koeninger <cody@...> Date: 2018-02-11T04:13:49Z [SPARK-17147][STREAMING][KAFKA] remove stray param doc commit 12e65bedddbcd2407598e69fa3c6fcbcdfc67e5d Author: cody koeninger <cody@...> Date: 2018-02-11T04:28:22Z [SPARK-17147][STREAMING][KAFKA] prepare for merge of master commit 2ed51f1f73ee75ffd08355265a72e68e83ef592d Author: cody koeninger <cody@...> Date: 2018-02-11T05:19:31Z Merge branch 'master' into SPARK-17147 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 Seems reasonable to me but you should probably ask zsxwing if it fits in with plans for the structured streaming kafka code. On Thu, Nov 23, 2017 at 10:23 PM, Hyukjin Kwon <notificati...@github.com> wrote: > *@HyukjinKwon* commented on this pull request. > > Does this look good to you @koeninger <https://github.com/koeninger>? > -- > > In external/kafka-0-10/src/main/scala/org/apache/spark/ > streaming/kafka010/KafkaRDD.scala > <https://github.com/apache/spark/pull/19789#discussion_r152895550>: > > > @@ -211,8 +211,8 @@ private[spark] class KafkaRDD[K, V]( > var requestOffset = part.fromOffset > > def closeIfNeeded(): Unit = { > - if (!useConsumerCache && consumer != null) { > -consumer.close > + if (consumer != null) { > + consumer.close() > > I think this should be double spaced > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/19789#pullrequestreview-78827551>, > or mute the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB7EEFdiDoEY3ov2wmiDWrnCZN4gqks5s5kTLgaJpZM4QklrV> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 You'll need to get a commiter's attention to merge it anyway On Nov 23, 2017 01:48, "Daroo" <notificati...@github.com> wrote: It seems that your "magic spell" didn't work. No build was triggered â You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <https://github.com/apache/spark/pull/19789#issuecomment-346548898>, or mute the thread <https://github.com/notifications/unsubscribe-auth/AAGAB4Atem0hApHVaY_o1F8ksWNy6u0Lks5s5SNYgaJpZM4QklrV> . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 ok to test On Wed, Nov 22, 2017 at 2:49 PM, Daroo <notificati...@github.com> wrote: > Cool. Could you please authorize it for testing? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/19789#issuecomment-346469044>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB8S6e62SscDmZ6tM3eTVPlHTj6nQks5s5IjkgaJpZM4QklrV> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 Seems reasonable. On Wed, Nov 22, 2017 at 1:52 PM, Daroo <notificati...@github.com> wrote: > It fails on the current master branch and doesn't after the patch > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/19789#issuecomment-346456558>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB_y8Z7XRTNv08wpbZh3-UN2HEwA2ks5s5HuFgaJpZM4QklrV> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 What are you actually asserting in that test and/or does it reliably fail if run on the version of your code before the patch? On Wed, Nov 22, 2017 at 1:33 PM, Daroo <notificati...@github.com> wrote: > I've added a test. @koeninger <https://github.com/koeninger> is it > something you had in mind? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/19789#issuecomment-346452116>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB_Sz5wdH2nf4eM0z-DMwAInQD0MQks5s5HclgaJpZM4QklrV> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 Yeah, subscribepattern could definitely be an issue. As far as unit testing, have you tried anything along the lines of setting the cache size artificially low and then introducing new topicpartitions? On Mon, Nov 20, 2017 at 11:43 AM, Daroo <notificati...@github.com> wrote: > I see you point, but it's kind of difficult to size it properly when you > use SubscribePattern (i.e. dynamic number of topics/partitions) consumer > strategy > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/19789#issuecomment-345772062>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGABxwDwzBPfH-VH8BoU8g3BjyMj_JQks5s4bpYgaJpZM4QklrV> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19789 My main comment is that if you have a situation where there's actually contention on the size of the cache, chances are things are going to be screwed up anyway due to consumers being recreated and losing any benefit of buffering. Are you running into this issue in situations where the cache is appropriately sized given the number of topicpartitions and executors? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsa...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/19789#discussion_r152056775 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -155,11 +178,11 @@ object CachedKafkaConsumer extends Logging { logInfo(s"Cache miss for $k") logDebug(cache.keySet.toString) val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) -cache.put(k, c) +cache.put(k, c.acquireAndGet()) c } else { // any given topicpartition should have a consistent key and value type -v.asInstanceOf[CachedKafkaConsumer[K, V]] +v.acquireAndGet().asInstanceOf[CachedKafkaConsumer[K, V]] --- End diff -- Shouldn't this method call be after the cast? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19274: [SPARK-22056][Streaming] Add subconcurrency for KafkaRDD...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19274 Search Jira and the mailing list, this idea has been brought up multiple times. I don't think breaking fundamental assumptions of Kafka (one consumer thread per group per partition) is a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19134 There's already a jira about why 0.10 doesn't have python support, https://issues-test.apache.org/jira/browse/SPARK-16534 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19134 I think it makes sense to go ahead and put deprecation warnings in this PR as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStreams to ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/11863 You won't get any reasonable semantics out of auto commit, because it will commit on the driver without regard to what the executors have done. On Aug 2, 2017 21:46, "Wallace Huang" <notificati...@github.com> wrote: > heyï¼I have an question about the setting "auto.commit.enable"ï¼ It could be > changed Because I wanna save the offsets information to zookeeper > cluster. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/11863#issuecomment-319852845>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAByVZ2QEf_o627Z7BKdRBuCtmza-Nks5sUTSTgaJpZM4H1Pg1> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18353: [SPARK-21142][SS] spark-streaming-kafka-0-10 should depe...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/18353 I thought the historical reason testutils was in main rather than test was for ease of use by other tests (e.g. python). There's even still a comment in the code to that effect. That may not matter at this point given python support for 0.10 never went anywhere, and the outlook on dstreams in general, but just bringing it up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18234: [SPARK-19185][DSTREAM] Make Kafka consumer cache configu...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/18234 LGTM, thanks Mark --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18234: [SPARK-19185][DSTREAM] Make Kafka consumer cache ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18234#discussion_r120716157 --- Diff: docs/streaming-kafka-0-10-integration.md --- @@ -91,7 +91,7 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location). -The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity` +The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`. If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. --- End diff -- Code change LGTM. I'd prefer clarifying / adding caveats to the documentation, rather than leaving it undocumented. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/18143 Sorry, noticed a couple more minor configuration related things. Otherwise LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119636878 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging { private case class CacheKey(groupId: String, topic: String, partition: Int) - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ def init( initialCapacity: Int, maxCapacity: Int, loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { -if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") - cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]( -initialCapacity, loadFactor, true) { -override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = { - if (this.size > maxCapacity) { -try { - entry.getValue.consumer.close() -} catch { - case x: KafkaException => -logError("Error closing oldest Kafka consumer", x) -} -true - } else { -false +if (cache == null) { + val duration = + SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", "30m") --- End diff -- All of the other configs in the dstream are of the form "spark.streaming.kafka.consumer.cache.something", not sure whether it's better to be consistent with the sql config, or with the dstream config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119636950 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging { private case class CacheKey(groupId: String, topic: String, partition: Int) - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ def init( initialCapacity: Int, maxCapacity: Int, loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { --- End diff -- Load factor is being ignored at this point, yes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119636360 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging { private case class CacheKey(groupId: String, topic: String, partition: Int) - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ def init( initialCapacity: Int, maxCapacity: Int, loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { -if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") - cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]( -initialCapacity, loadFactor, true) { -override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = { - if (this.size > maxCapacity) { -try { - entry.getValue.consumer.close() -} catch { - case x: KafkaException => -logError("Error closing oldest Kafka consumer", x) -} -true - } else { -false +if (cache == null) { + val duration = + SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", "30m") + val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer[_, _]] { +override def onRemoval( +n: RemovalNotification[CacheKey, CachedKafkaConsumer[_, _]]): Unit = { + n.getCause match { +case RemovalCause.SIZE => + logWarning( +s"Evicting consumer ${n.getKey}," + + s" due to size limit reached. Capacity: $maxCapacity.") +case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: ${n.getCause}") + } + try { +n.getValue.close() + } catch { +case NonFatal(e) => + logWarning(s"Error in closing Kafka consumer: ${n.getKey}," + +s" evicted from cache due to ${n.getCause}", e) } } } + + cache = CacheBuilder.newBuilder() +.maximumSize(maxCapacity).removalListener(removalListener) --- End diff -- initialCapacity is being passed in, but not used, this should probably set initial capacity as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119374298 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -147,20 +155,14 @@ object CachedKafkaConsumer extends Logging { groupId: String, topic: String, partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = -CachedKafkaConsumer.synchronized { + kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = { val k = CacheKey(groupId, topic, partition) - val v = cache.get(k) - if (null == v) { -logInfo(s"Cache miss for $k") -logDebug(cache.keySet.toString) -val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) -cache.put(k, c) -c - } else { -// any given topicpartition should have a consistent key and value type -v.asInstanceOf[CachedKafkaConsumer[K, V]] - } + val v = cache.get(k, new Callable[CachedKafkaConsumer[_, _]] { +override def call(): CachedKafkaConsumer[K, V] = { + new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) --- End diff -- I think it's worth keeping the info / debug level logging for when a cache miss has happened. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119371920 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging { private case class CacheKey(groupId: String, topic: String, partition: Int) - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ def init( initialCapacity: Int, maxCapacity: Int, loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { -if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") - cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]( -initialCapacity, loadFactor, true) { -override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = { - if (this.size > maxCapacity) { -try { - entry.getValue.consumer.close() -} catch { - case x: KafkaException => -logError("Error closing oldest Kafka consumer", x) -} -true - } else { -false - } +val duration = SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m") +val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer[_, _]] { + override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer[_, _]]): Unit = { +n.getCause match { + case RemovalCause.SIZE => +logWarning( + s"Evicting consumer ${n.getKey}, due to size limit reached. Capacity: $maxCapacity.") + case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: ${n.getCause}") +} +try { + n.getValue.close() +} catch { + case NonFatal(e) => +logWarning(s"Error in closing Kafka consumer: ${n.getKey}," + + s" evicted from cache due to ${n.getCause}", e) } } } +if (cache == null) { --- End diff -- Shouldn't the whole function be guarded by this if statement? Is there any reason to construct a removal listener otherwise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119371285 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging { private case class CacheKey(groupId: String, topic: String, partition: Int) - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ def init( initialCapacity: Int, maxCapacity: Int, loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { -if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") - cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]( -initialCapacity, loadFactor, true) { -override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = { - if (this.size > maxCapacity) { -try { - entry.getValue.consumer.close() -} catch { - case x: KafkaException => -logError("Error closing oldest Kafka consumer", x) -} -true - } else { -false - } +val duration = SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m") --- End diff -- I don't think this should be using configuration from the spark.sql namespace --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/18143 Generally looks ok to me, thanks. 2 questions - Did you do any testing on workloads to see if performance stayed the same? Is there a reason not to do the same thing to streaming/kafka010/CachedKafkaConsumer.scala for consistency? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119132090 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging { private lazy val cache = { val conf = SparkEnv.get.conf -val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64) -new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) { - override def removeEldestEntry( -entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = { -if (entry.getValue.inuse == false && this.size > capacity) { - logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " + -s"removing consumer for ${entry.getKey}") - try { -entry.getValue.close() - } catch { -case e: SparkException => - logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e) - } - true -} else { - false +val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity" +val capacity = conf.getInt(capacityConfigString, 64) +val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m") --- End diff -- It seems like the general trend towards default configuration values is to make them work for the lowest common denominator use case, in which case I'd argue for a longer (30 min?) default timeout. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/18143#discussion_r119131006 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging { private lazy val cache = { val conf = SparkEnv.get.conf -val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64) -new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) { - override def removeEldestEntry( -entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = { -if (entry.getValue.inuse == false && this.size > capacity) { - logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " + -s"removing consumer for ${entry.getKey}") - try { -entry.getValue.close() - } catch { -case e: SparkException => - logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e) - } - true -} else { - false +val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity" +val capacity = conf.getInt(capacityConfigString, 64) +val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m") +val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer] { + override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer]): Unit = { +n.getCause match { + case RemovalCause.SIZE => +logWarning( --- End diff -- It shouldn't really be a normal operation. If capacity is smaller than the number of partitions that are regularly being assigned to a given node, it's going to kill performance due to recreating consumers every batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17308 @marmbrus @zsxwing @tdas This needs attention from someone --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17774 LGTM pending jason's comments on tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17774 Have you read the function def clamp? Rate limit of 1 should not imply an attempt to grab 1 message even if it doesn't exist. On Apr 27, 2017 11:01, "Sebastian Arzt" <notificati...@github.com> wrote: > @koeninger <https://github.com/koeninger> I agree that assuming a long > batch size is wrong, not sure whether it even matters. > But what if for one partition there is no lack in the current batch? Then > fetching 1 message for this partition from kafka, is you suggest, would > fail. So here zero makes sense in my eyes. This is also the old behaviour > if rate > 1 and lag == 0 here > <https://github.com/apache/spark/blob/master/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala#L107> > . > Further, I think that truncating 0.99 to 0 messages per partition is also > the right thing to do, as one cannot be sure that there is one message > available if (secsPerBatch * limit) < 1.0. And as you say, in a future > batch it is very like to become greater than 1.0. > Do you agree? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/17774#issuecomment-297758733>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB1X8NCNqECUlx9X54DSAmbnHmHdAks5r0LvggaJpZM4NJAVA> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17774 @arzt It's entirely possible to have batch times less than a second, and I'm not sure I agree that the absolute number of messages allowable for a partition should ever be zero. So to put this another way, right now effectiveRateLimitPerPartition is a Map[TopicPartition, Long], which matches the return value of the function maxMessagesPerPartition. You're wanting to change effectiveRateLimitPerPartition to a Map[TopicPartition, Double], which is probably a good idea, and should fix the bug around treating a very small rate limit as no limit. But it still needs to be converted to Map[TopicPartition, Long] before returning. Calling .toLong is probably not the right thing to do there, because 0.99 will get truncated to 0. I think one message per partition per batch is the minimum reasonable rate limit, otherwise particular partitions may not make progress. The relative lag calculation might take care of that in future batches, but it still seems questionable, even if it's a corner case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17774 How do you read 0.1 of a kafka message for a given partition of a given batch? Ultimately the floor for a rate limit, assuming one is set, needs to be 1 message per partition per batch, not a fraction, which is why it's a long. If you want to delay that conversion by keeping it as a double as long as possible, that makes sense, but the lines like (secsPerBatch * limit).toLong probably need attention too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17675: [SPARK-20036][DOC] Note incompatible dependencies...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/17675 [SPARK-20036][DOC] Note incompatible dependencies on org.apache.kafka artifacts ## What changes were proposed in this pull request? Note that you shouldn't manually add dependencies on org.apache.kafka artifacts ## How was this patch tested? Doc only change, did jekyll build and looked at the page. You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-20036 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17675 commit bcba5bac51aea773616fda539a9a1e48604a7815 Author: cody koeninger <c...@koeninger.org> Date: 2017-04-18T21:16:14Z [SPARK-20036][DOC] Document that you shouldnt add incompatible dependencies on org.apache.kafka artifacts --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17308 Just to throw in my two cents, a change like this is definitely needed, as is made clear by the second sentence of the docs http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html "The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16006 @zsxwing you or anyone else have time to look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/16006#discussion_r105720383 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -143,9 +147,16 @@ private[spark] class DirectKafkaInputDStream[K, V]( lagPerPartition.map { case (tp, lag) => val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) - val backpressureRate = Math.round(lag / totalLag.toFloat * rate) - tp -> (if (maxRateLimitPerPartition > 0) { -Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) + val effectiveRate = if (rate >= 0) rate else backpressureInitialRate --- End diff -- This can be moved before the map, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16006 Any interest in picking this back up if you can get a committer's attention? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16629: [SPARK-19185][DStream] Add more clear hint for 'Concurre...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16629 I don't think it's a problem to make disabling the cache configurable, as long as it's on by default. I don't think the additional static constructors in kafka utils are necessary, are they? I'm not sure it's a good idea to just blindly recommend people turn it off it they get that exception though, it's not that simple. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16569: [SPARK-19206][DOC][DStream] Fix outdated parameter descr...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16569 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/16006#discussion_r91118893 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -143,9 +147,14 @@ private[spark] class DirectKafkaInputDStream[K, V]( lagPerPartition.map { case (tp, lag) => val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) - val backpressureRate = Math.round(lag / totalLag.toFloat * rate) - tp -> (if (maxRateLimitPerPartition > 0) { -Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) + val effectiveRate = if (rate >= 0) rate else backpressureInitialRate + val estimateRate = Math.round(lag / totalLag.toFloat * effectiveRate) + val backpressureRate = +if (estimateRate > maxRateLimitPerPartition && maxRateLimitPerPartition > 0) { + maxRateLimitPerPartition +} +else estimateRate --- End diff -- http://spark.apache.org/contributing.html use braces around the else clause unless it's all on one line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/16006#discussion_r91118458 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -67,6 +67,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( ekp } + val backpressureInitialRate: Long = + _ssc.sparkContext.conf.getLong("spark.streaming.backpressure.initialRate", + _ssc.sparkContext.conf.getDouble("spark.streaming.backpressure.pid.minRate", 100).round) --- End diff -- Shouldn't these be using ssc instead of _ssc for consistency? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/16006#discussion_r91001763 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -67,6 +67,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( ekp } + val backpressureInitialRate: Long = + _ssc.sparkContext.conf.getLong("spark.streaming.backpressure.initialRate", + 1 + _ssc.sparkContext.conf.getLong("spark.streaming.kafka.maxRatePerPartition", 1000L) / 10) + --- End diff -- Where is this divide by 10 coming from? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Us...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/16006#discussion_r91002372 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -143,9 +147,14 @@ private[spark] class DirectKafkaInputDStream[K, V]( lagPerPartition.map { case (tp, lag) => val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) - val backpressureRate = Math.round(lag / totalLag.toFloat * rate) - tp -> (if (maxRateLimitPerPartition > 0) { -Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) + val backpressureRate = +if (rate >= 0) { + val estimateRate = Math.round(lag / totalLag.toFloat * rate) + if (estimateRate > maxRateLimitPerPartition && maxRateLimitPerPartition > 0) { +maxRateLimitPerPartition + } else estimateRate +} else math.min(backpressureInitialRate, maxRateLimitPerPartition) --- End diff -- I don't think these units are comparable, one is records / sec, and the other is records / sec / partition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false work w...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15820 Because the comment made by me and +1'ed by marmbrus is hidden at this point, I just want to re-iterate that this patch should not skip the rest of the partition in the case that a timeout happens. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88360922 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +null + } else { +// offset < beginningOffset <= untilOffset - 1 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") +getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} + } + + /** + * Get the earliest record in [offset, untilOffset) from the fetched data. If there is no
[GitHub] spark issue #15849: [SPARK-18410][STREAMING] Add structured kafka example
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15849 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15849: [SPARK-18410][STREAMING] Add structured kafka exa...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15849#discussion_r87795091 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; + +import java.util.Arrays; +import java.util.Iterator; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaStructuredKafkaWordCount + *The Kafka "bootstrap.servers" configuration. A + * comma-separated list of host:port. + *There are three kinds of type, i.e. 'assign', 'subscribe', + * 'subscribePattern'. + * |- Specific TopicPartitions to consume. Json string + * | {"topicA":[0,1],"topicB":[2,4]}. + * |- The topic list to subscribe. A comma-separated list of + * | topics. + * |- The pattern used to subscribe to topic(s). + * | Only one of "assign, "subscribe" or "subscribePattern" options can be + * | specified for Kafka source. + *A list of one or more kafka topics to consume from. Please refer --- End diff -- This isn't really a list of topics for assign or subscribePattern. I'd say use a more general description, or stick to just subscribe --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87439798 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +null + } else { +// offset < beginningOffset <= untilOffset - 1 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") +getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} + } + + /** + * Get the earliest record in [offset, untilOffset) from the fetched data. If there is no
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87438222 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- I'm clearer on why this terminates, but I think it's worth a comment, since it's a mutually recursive call without changing arguments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87438788 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) --- End diff -- I think it's worth the warning explicitly stating that data has been lost --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false work w...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15820 Wow, looks like the new github comment interface did all kinds of weird things, apologies about that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129126 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- - Why is this not an early return? - The arguments to the recursive function have not changed at this point, right? Why does it terminate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129981 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false --- End diff -- Can this var be eliminated by just using a single try around the if / else? It's the same catch condition in either case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129817 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) --- End diff -- I don't think it's necessary to seek every time the fetched data is empty, in normal operation the poll should return the next offset, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129927 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) +} + } else { +// Case 1 or 7 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset})") +return getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} else { + if (!fetchedData.hasNext()) { +// We cannot fetch anything after `polling`. Two possible cases: +// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. +// - Cannot fetch any date before timeout. +// Because there is no way to distinguish, just skip the rest offsets in the current +// partition. +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } + + val record = fetchedData.next() + if (record.offset >= untilOffset) { +// Case 2 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87127811 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) +} + } else { +// Case 1 or 7 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset})") +return getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} else { + if (!fetchedData.hasNext()) { +// We cannot fetch anything after `polling`. Two possible cases: +// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. +// - Cannot fetch any date before timeout. +// Because there is no way to distinguish, just skip the rest offsets in the current +// partition. +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } + + val record = fetchedData.next() + if (record.offset >= untilOffset) { +// Case 2 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87130059 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- - Why isn't this an early return? - Unless I'm misreading, this is a recursive call without changing the arguments. Why is it guaranteed to terminate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87128373 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) +} + } else { +// Case 1 or 7 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset})") +return getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} else { + if (!fetchedData.hasNext()) { +// We cannot fetch anything after `polling`. Two possible cases: +// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. +// - Cannot fetch any date before timeout. +// Because there is no way to distinguish, just skip the rest offsets in the current +// partition. +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } + + val record = fetchedData.next() + if (record.offset >= untilOffset) { +// Case 2 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129204 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false --- End diff -- Can't this var be eliminated with a singly try around the following if/else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15132 @zsxwing @rxin the per-partition rate limit probably won't overflow, but the overall backpressure rate limit was being cast to an int, which definitely can overflow. I changed it in this latest commit, if you think that should be a different PR instead let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15132#discussion_r86810750 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Interface for user-supplied configurations that can't otherwise be set via Spark properties, + * because they need tweaking on a per-partition basis, + */ +@Experimental +abstract class PerPartitionConfig() extends Serializable { + /** + * Maximum rate (number of records per second) at which data will be read + * from each Kafka partition. + */ + def maxRatePerPartition(topicPartition: TopicPartition): Int --- End diff -- That's a reasonable question. 2 billion messages per second on a single thread would be pretty extreme, but I don't think it hurts us to change this to a long. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15132 @rxin thanks, changed to abstract class. If you think that's sufficient future proofing I otherwise think this is a worthwhile change, seems like it meets a real user need. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15132 I sent a message to d...@spark.apache.org, if you're not already subscribed I'd say subscribe and follow up in case there's any discussion there rather than on the pr / jira. Someone may want to know more about your use case, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15132: [SPARK-17510][STREAMING][KAFKA] config max rate on a per...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15132 Ok... so the question at this point is whether it's worth making the API change, which ultimately we'll have to track down a committer to decide. As the PR stands, it shouldn't break binary compatibility because the class constructor it's changing is private. But future changes to PerPartitionConfig would. Kind of a question of whether it's worth coming up with a generic additional option scheme now, or possibly making breaking changes to an api (granted, one that's labeled experimental) later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15737 Good catch, I just mistakenly changed to AsMS in one place but not both. On the test changes, do you want tests waiting up to 2 minutes * however many kafka calls are being made? If so I can rip everything else out, just let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15737: [SPARK-18212][SS][KAFKA] increase executor poll t...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15737#discussion_r86254376 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource( private val sc = sqlContext.sparkContext - private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + private val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +(sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString --- End diff -- My thinking was just to make it really obvious it was the same default as spark.network.timeout usually has, since I didn't find a standalone constant somewhere for that value. If you think it's clearer as ms, I don't feel strongly either way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15737 Ok, I'll update the default in both places to use spark.network.timeout and leave the test config at 10 seconds --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15737 I will always choose "fail in an obvious way that I can start debugging" versus "start behaving poorly in non-obvious ways". Similar reason I thought it was a really poor idea for the original PRs for structured streaming kafka to silently swallow offset out of range: something is wrong, and I want to know about it. Ultimately this particular thing is tunable, so if you really want to increase the default to however many seconds let me know - it won't affect my ability to do my job. On Wed, Nov 2, 2016 at 1:42 PM, Michael Armbrust <notificati...@github.com> wrote: > If we have prefetched the timeout should never kick in (I hope?), so we > probably only care about the network here. If my choices are "fail the job" > or "wait a little bit longer", I don't see why you would ever chose "fail > the job" for transient network issues. If its not a transient issue, do you > really think that knowing about it 9.5 seconds earlier outweighs that cost > of false negatives? > > I guess I don't understand what kind of Kafka issues you are optimizing > for? > > â > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/15737#issuecomment-257960644>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB9_f5Jaae1inqEK4uTw45cgPdQ1Hks5q6NmfgaJpZM4KnW2M> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15737: [SPARK-18212][SS][KAFKA] increase executor poll timeout
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15737 In most cases poll should be returning prefetched data from the buffer, not waiting to talk to kafka over the network. I could see increasing it a little bit, but I don't think it should be increased to 10 seconds by default. That's a really long time for a 1 second batch streaming job to silently wait, and the only thing you'll be able to observe is that your job starts running slow for some unknown reason. At least with a smaller default any network or Kafka issues will be obvious, and you can either diagnose them or tune upwards. On Wed, Nov 2, 2016 at 1:06 PM, Michael Armbrust <notificati...@github.com> wrote: > Is there are reason to not change the user facing default? Its failing our > tests and it was failing for me for real with the current default. When we > are poling on the executors, the offsets should always be available, so I > don't think increasing the timeout will hurt anything. > > â > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/15737#issuecomment-257949815>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB6lwUbgWwgpfkaOgyimftB_nYvpUks5q6NFCgaJpZM4KnW2M> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15702#discussion_r86203226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -104,85 +110,105 @@ case class StateStoreSaveExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -assert(returnAllStates.nonEmpty, - "Incorrect planning in IncrementalExecution, returnAllStates have not been set") -val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else saveAndReturnUpdated _ +assert(outputMode.nonEmpty, + "Incorrect planning in IncrementalExecution, outputMode has not been set") + child.execute().mapPartitionsWithStateStore( getStateId.checkpointLocation, operatorId = getStateId.operatorId, storeVersion = getStateId.batchId, keyExpressions.toStructType, child.output.toStructType, sqlContext.sessionState, - Some(sqlContext.streams.stateStoreCoordinator) -)(saveAndReturnFunc) + Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) => +val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) +val numOutputRows = longMetric("numOutputRows") +val numTotalStateRows = longMetric("numTotalStateRows") +val numUpdatedStateRows = longMetric("numUpdatedStateRows") + +outputMode match { + // Update and output all rows in the StateStore. + case Some(Complete) => +while (iter.hasNext) { + val row = iter.next().asInstanceOf[UnsafeRow] + val key = getKey(row) + store.put(key.copy(), row.copy()) + numUpdatedStateRows += 1 +} +store.commit() +numTotalStateRows += store.numKeys() +store.iterator().map { case (k, v) => + numOutputRows += 1 + v.asInstanceOf[InternalRow] +} + + // Update and output only rows being evicted from the StateStore + case Some(Append) => +while (iter.hasNext) { + val row = iter.next().asInstanceOf[UnsafeRow] + val key = getKey(row) + store.put(key.copy(), row.copy()) + numUpdatedStateRows += 1 +} + +val watermarkAttribute = + keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get +// If we are evicting based on a window, use the end of the window. Otherwise just +// use the attribute itself. +val evictionExpression = + if (watermarkAttribute.dataType.isInstanceOf[StructType]) { +LessThanOrEqual( + GetStructField(watermarkAttribute, 1), + Literal(eventTimeWatermark.get * 1000)) + } else { +LessThanOrEqual( + watermarkAttribute, + Literal(eventTimeWatermark.get * 1000)) + } + +logInfo(s"Filtering state store on: $evictionExpression") +val predicate = newPredicate(evictionExpression, keyExpressions) +store.remove(predicate) + +store.commit() + +numTotalStateRows += store.numKeys() +store.updates().filter(_.isInstanceOf[ValueRemoved]).map { removed => + numOutputRows += 1 + removed.value.asInstanceOf[InternalRow] +} + + // Update and output modified rows from the StateStore. + case Some(Update) => --- End diff -- Yes, I think it's a good idea to explicitly say for each output mode whether watermarks affect emit and evict. Just so I'm clear, the intention is Append: affects emit, affects evict Update: doesn't affect emit, affects evict Complete: doesn't affect emit, no eviction Is that right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15737: [SPARK-18212][SS][KAFKA] increase executor poll t...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/15737 [SPARK-18212][SS][KAFKA] increase executor poll timeout ## What changes were proposed in this pull request? Increase poll timeout to try and address flaky test ## How was this patch tested? Ran existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-18212 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15737.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15737 commit a7f78d2996672a335f29acafe2ba2350541d6ba2 Author: cody koeninger <c...@koeninger.org> Date: 2016-11-02T15:12:28Z [SPARK-18212][SS][KAFKA] increase executor poll timeout --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15702: [SPARK-18124] Observed delay based Event Time Watermarks
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15702 Given the concerns Ofir raised about a single far future event screwing up monotonic event time, do you want to document that problem even if there isn't an enforced filter for it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15702#discussion_r86066774 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -536,6 +535,41 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: + * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time + * before which we assume no more late data is going to arrive. + * + * Spark will use this watermark for several purposes: + * - To know when a given time window aggregation can be finalized and thus can be emitted when + *using output modes that do not allow updates. + * - To minimize the amount of state that we need to keep for on-going aggregations. + * + * The current event time is computed by looking at the `MAX(eventTime)` seen in an epoch across + * all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost + * of coordinating this value across partitions, the actual watermark used is only guaranteed + * to be at least `delayThreshold` behind the actual event time. In some cases we may still + * process records that arrive more than `delayThreshold` late. + * + * @param eventTime the name of the column that contains the event time of the row. + * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest + * record that has been processed in the form of an interval + * (e.g. "1 minute" or "5 hours"). --- End diff -- Should this make it clear what the minimum useful granularity is (ms)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15702#discussion_r86067616 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -104,85 +110,105 @@ case class StateStoreSaveExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -assert(returnAllStates.nonEmpty, - "Incorrect planning in IncrementalExecution, returnAllStates have not been set") -val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else saveAndReturnUpdated _ +assert(outputMode.nonEmpty, + "Incorrect planning in IncrementalExecution, outputMode has not been set") + child.execute().mapPartitionsWithStateStore( getStateId.checkpointLocation, operatorId = getStateId.operatorId, storeVersion = getStateId.batchId, keyExpressions.toStructType, child.output.toStructType, sqlContext.sessionState, - Some(sqlContext.streams.stateStoreCoordinator) -)(saveAndReturnFunc) + Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) => +val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) +val numOutputRows = longMetric("numOutputRows") +val numTotalStateRows = longMetric("numTotalStateRows") +val numUpdatedStateRows = longMetric("numUpdatedStateRows") + +outputMode match { + // Update and output all rows in the StateStore. + case Some(Complete) => +while (iter.hasNext) { + val row = iter.next().asInstanceOf[UnsafeRow] + val key = getKey(row) + store.put(key.copy(), row.copy()) + numUpdatedStateRows += 1 +} +store.commit() +numTotalStateRows += store.numKeys() +store.iterator().map { case (k, v) => + numOutputRows += 1 + v.asInstanceOf[InternalRow] +} + + // Update and output only rows being evicted from the StateStore + case Some(Append) => +while (iter.hasNext) { + val row = iter.next().asInstanceOf[UnsafeRow] + val key = getKey(row) + store.put(key.copy(), row.copy()) + numUpdatedStateRows += 1 +} + +val watermarkAttribute = + keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get +// If we are evicting based on a window, use the end of the window. Otherwise just +// use the attribute itself. +val evictionExpression = + if (watermarkAttribute.dataType.isInstanceOf[StructType]) { +LessThanOrEqual( + GetStructField(watermarkAttribute, 1), + Literal(eventTimeWatermark.get * 1000)) + } else { +LessThanOrEqual( + watermarkAttribute, + Literal(eventTimeWatermark.get * 1000)) + } + +logInfo(s"Filtering state store on: $evictionExpression") +val predicate = newPredicate(evictionExpression, keyExpressions) +store.remove(predicate) + +store.commit() + +numTotalStateRows += store.numKeys() +store.updates().filter(_.isInstanceOf[ValueRemoved]).map { removed => + numOutputRows += 1 + removed.value.asInstanceOf[InternalRow] +} + + // Update and output modified rows from the StateStore. + case Some(Update) => --- End diff -- I'm not clear on why the semantics of Update mean that watermarks shouldn't be used to remove state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15702#discussion_r86066376 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -536,6 +535,41 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: + * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time + * before which we assume no more late data is going to arrive. + * + * Spark will use this watermark for several purposes: + * - To know when a given time window aggregation can be finalized and thus can be emitted when + *using output modes that do not allow updates. + * - To minimize the amount of state that we need to keep for on-going aggregations. --- End diff -- For append, this sounds like the intention is emit only once watermark has passed, and drop state. But for other output modes, it's not clear from reading this what the effect of the watermark on emission and dropping state is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15702#discussion_r86066082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -536,6 +535,41 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: + * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time + * before which we assume no more late data is going to arrive. + * + * Spark will use this watermark for several purposes: + * - To know when a given time window aggregation can be finalized and thus can be emitted when + *using output modes that do not allow updates. + * - To minimize the amount of state that we need to keep for on-going aggregations. + * + * The current event time is computed by looking at the `MAX(eventTime)` seen in an epoch across --- End diff -- - Should this be "The current watermark is computed..." ? - what is an epoch, it isn't mentioned in the docs or elsewhere in the PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15715: [SPARK-18198][Doc][Streaming] Highlight code snippets
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15715 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15681 I'm agnostic on the value of adding the overload, if @lw-lin thinks it's more convenient for users. There are considerably fewer overloads as it stands than the old 0.8 version of KafkaUtils, so I don't think we're getting too messy/crowded yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15681 I don't think there's a reason to deprecate it. ju.Map is the lowest common denominator for kafka params, it's used by the underlying consumer, and it's what the ConsumerStrategy interface expects to return. I would definitely prefer to use it in my Scala jobs that were generating a sequence of RDDs. On Tue, Nov 1, 2016 at 9:47 AM, Sean Owen <notificati...@github.com> wrote: > We would want to change the code so that nothing calls the deprecated > method including tests. I think you can let the deprecated one call the new > one? When to remove it is a question for the future, yes. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/15681#issuecomment-257585401>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB-urGivwIvNU8dmlHNqF11otDBo_ks5q51EdgaJpZM4KkHhg> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15681 LGTM, thanks. If you want to open a separate PR to cleanup the private doc issues you noticed, go for it, shouldn't need another Jira imho if it isn't changing code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15681: [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() sc...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15681 I think this should just be another createRDD overload that takes a scala map. The minor additional maintenance overhead of that method as opposed to change the existing one isn't worth breaking clients. I'm all in favor of changes to experimental apis when they are necessary, but this isn't necessary. That also makes most of the other changes in this PR unnecessary - you don't need to touch mima, you don't need to refactor to createRddInternal, etc etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15681: [Minor][Streaming][Kafka] Kafka010 .createRDD() scala AP...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15681 Public API changes even on an experimental module aren't a minor thing, open a Jira. It's also better not to lump unrelated changes together. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15679 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15679 Thanks for working on this, couple minor things to fix but otherwise looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15679#discussion_r85642029 --- Diff: docs/streaming-kafka-0-10-integration.md --- @@ -165,6 +240,36 @@ For data stores that support transactions, saving offsets in the same transactio } + // The details depend on your data store, but the general idea looks like this + + // begin from the the offsets committed to the database + Map<TopicPartition, Long> fromOffsets = new HashMap<>(); + for (resultSet: selectOffsetsFromYourDatabase) + fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); + } + + JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.<String, String>createDirectStream( + streamingContext, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) + ); + + stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { + @Override + public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + Object results = yourCalculation(rdd); + + yourTransactionBlock { --- End diff -- I agree with Sean, this would probably be clearer if it was changed to a comment like // begin your transaction ... // end your transaction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85641502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala --- @@ -36,8 +36,8 @@ class StreamingQueryException private[sql]( @transient val query: StreamingQuery, val message: String, val cause: Throwable, -val startOffset: Option[Offset] = None, -val endOffset: Option[Offset] = None) +val startOffset: Option[OffsetSeq] = None, --- End diff -- Why not expose the actual json string? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15679 Were these extracted from compiled example projects, or just written up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15679#discussion_r85641432 --- Diff: docs/streaming-kafka-0-10-integration.md --- @@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By stream.foreachRDD { rdd => - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed - stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) + stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics. + stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { + @Override + public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + // some time later, after outputs have completed + ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); --- End diff -- I think it's far too late to fix those issues at this point. DStreams return an RDD, not a parameterized type. KafkaUtils methods return DStreams and RDDs, not an implementation specific type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85253453 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource( --- End diff -- Shouldn't it be set to the highest available offset in the streaming metadata log, not the highest available offset in kafka? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15626: SPARK-17829 [SQL] Stable format for offset log
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15626 So it looks like this is the json format being used for kafka offsets: [{"_1":{"hash":0,"partition":0,"topic":"t"},"_2":2},{"_1":{"hash":0,"partition":1,"topic":"t"},"_2":2}] My main concerns are that is unnecessarily verbose, and is completely different from the format being used to specify per-topicpartition offsets in org.apache.spark.sql.kafka010.JsonUtils I see no reason why those formats should be different - a user should be able to take offsets directly from the checkpoint and use them to start a job, for instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15504#discussion_r84378170 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io.Writer + +import scala.collection.mutable.{ ArrayBuffer, HashMap } +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node._ +import org.apache.kafka.common.TopicPartition + +/** + * Utilities for converting Kafka related objects to and from json. + */ +private object JsonUtils { --- End diff -- KafkaSourceOffset is a Map[TopicPartition, Long]. This PR has an api for a user to provide a Map[TopicPartition, Long]. There's an existing ticket for making the metadata log store user-readable json. This PR uses json. Are they really unrelated? Why would we want a second, different, json format for the same information --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15504#discussion_r84374872 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io.Writer + +import scala.collection.mutable.{ ArrayBuffer, HashMap } +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node._ +import org.apache.kafka.common.TopicPartition + +/** + * Utilities for converting Kafka related objects to and from json. + */ +private object JsonUtils { --- End diff -- Sorry, looks like I edited while you were commenting. I was thinking towards SPARK-17829, where this same format might get used more frequently throughout a job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15504#discussion_r84372134 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -232,6 +232,42 @@ private[kafka010] case class KafkaSource( override def toString(): String = s"KafkaSource[$consumerStrategy]" /** + * Set consumer position to specified offsets, making sure all assignments are set. + */ + private def fetchSpecificStartingOffsets( + partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = +withRetriesWithoutInterrupt { + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + + partitionOffsets.foreach { +case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp)) +case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp)) +case (tp, off) => consumer.seek(tp, off) + } + val result = partitionOffsets.map { +case (tp, _) => tp -> consumer.position(tp) + } + partitionOffsets.foreach { +case (tp, off) if off != -1 && off != -2 => + if (result(tp) != off) { +reportDataLoss( --- End diff -- good call, will fix that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15504#discussion_r84371466 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -317,6 +353,8 @@ private[kafka010] case class KafkaSource( try { result = Some(body) } catch { +case x: OffsetOutOfRangeException => --- End diff -- OffsetOutOfRange exeception can happen on any poll. I agree that with the way things are now, it shouldn't happen, but that has more to do with us currently having an explictly enforced value for auto.offset.reset, not with whether data is being fetched. This change is basically paranoia about catching all nonfatal errors without respecting user settings regarding data loss. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15504: [SPARK-17812][SQL][KAFKA] Assign and specific sta...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15504#discussion_r84370308 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io.Writer + +import scala.collection.mutable.{ ArrayBuffer, HashMap } +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node._ +import org.apache.kafka.common.TopicPartition + +/** + * Utilities for converting Kafka related objects to and from json. + */ +private object JsonUtils { --- End diff -- Regarding json4s, I haven't benchmarked this particular usage, but my practical experience has been that json4s is noticeably slower than just using jackson directly. Is this just a code conciseness concern, or is there some other reason to prefer json4s? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15570: [STREAMING][KAFKA][DOC] clarify kafka settings ne...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/15570 [STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches ## What changes were proposed in this pull request? Minor doc change to mention kafka configuration for larger spark batches. ## How was this patch tested? Doc change only, confirmed via jekyll. The configuration issue was discussed / confirmed with users on the mailing list. You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 kafka-doc-heartbeat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15570 commit a7b0a17edef25d2fe0ad2934072cb5019afff767 Author: cody koeninger <c...@koeninger.org> Date: 2016-10-20T17:37:12Z [STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/15527 [SPARK-17813][SQL][KAFKA] Maximum data per trigger ## What changes were proposed in this pull request? maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions. This is assuming SPARK-17812 is merged first due to common changes in test utils, if that ends up not being the case I can clean this up as a separate patch. ## How was this patch tested? Added unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-17813 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15527 commit c45ded7109474fcb40f03c772192eb38398f328a Author: cody koeninger <c...@koeninger.org> Date: 2016-10-14T04:23:02Z [SPARK-17812][SQL][KAFKA] parse json for topicpartitions and offsets commit 12d3988c4fcef9bbbd88ce69295d2ff3e5baa5ba Author: cody koeninger <c...@koeninger.org> Date: 2016-10-14T19:58:08Z Merge branch 'master' into SPARK-17812 commit 3120fd8ade24140777c29fc1487aa3f6e76152fb Author: cody koeninger <c...@koeninger.org> Date: 2016-10-14T21:37:35Z [SPARK-17812][SQL][KAFKA] implement specified offsets and assign commit 35bb8c3cfe77f2cb3d26f4afd3364caa6d0ec4cf Author: cody koeninger <c...@koeninger.org> Date: 2016-10-16T03:00:20Z [SPARK-17812][SQL][KAFKA] doc and test updates commit 2e53e5a3904305cb1d1b0f2325e31c9c434d16ec Author: cody koeninger <c...@koeninger.org> Date: 2016-10-16T03:16:11Z [SPARK-17812][SQL][KAFKA] style fixes commit 5e4511f0c7e84d15011a7eb8d208be13ed672b49 Author: cody koeninger <c...@koeninger.org> Date: 2016-10-16T03:52:39Z [SPARK-17812][SQL][KAFKA] additional paranoia on reset of starting offsets commit cae967cb88a7682b6794d5d2ef90a0d9a1d3ea60 Author: cody koeninger <c...@koeninger.org> Date: 2016-10-18T03:14:31Z Merge branch 'SPARK-17812' into SPARK-17813 Testing maxOffsetsPerTrigger requires the per-partition sendMessages testing added in SPARK-17812 commit 6c8d459f9795c6ff32e8bf78f8796869ca722ee3 Author: cody koeninger <c...@koeninger.org> Date: 2016-10-18T05:20:53Z [SPARK-17813][SQL][KAFKA] maxOffsetsPerTrigger proportional implementation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15407: [SPARK-17841][STREAMING][KAFKA] drain commitQueue
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15407 @rxin @tdas right now, items to be committed can be added to the queue, but they will never actually be removed from the queue. poll() removes, iterator() does not. I updated the description of the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org