Yes I use latest Kafka clients 0.11 to determine beginning offsets without seek and also I use Kafka offsets commits externally. I dont find the spark async commit useful for our needs.
Thanks Venkat On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <c...@koeninger.org> wrote: > You mentioned 0.11 version; the latest version of org.apache.kafka > kafka-clients artifact supported by DStreams is 0.10.0.1, for which it > has an appropriate dependency. > > Are you manually depending on a different version of the kafka-clients > artifact? > > On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <meven...@gmail.com> wrote: > > Version: 2.2 with Kafka010 > > > > Hi, > > > > We are running spark streaming on AWS and trying to process incoming > > messages on Kafka topics. All was well. > > Recently we wanted to migrate from 0.8 to 0.11 version of Spark library > and > > Kafka 0.11 version of server. > > > > With this new version of software we are facing issues with regard to 'No > > assignment to partition for a topic and it happens intermittently'. I > > construct four DStreams with different group.ids as suggested. > > > > The main source of code thats causing the issue is this one > > > > if (!toSeek.isEmpty) { > > // work around KAFKA-3370 when reset is none > > // poll will throw if no position, i.e. auto offset reset none and > no > > explicit position > > // but cant seek to a position before poll, because poll is what > gets > > subscription partitions > > // So, poll, suppress the first exception, then seek > > val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) > > val shouldSuppress = aor != null && > > aor.asInstanceOf[String].toUpperCase == "NONE" > > try { > > consumer.poll(0) > > } catch { > > case x: NoOffsetForPartitionException if shouldSuppress => > > logWarning("Catching NoOffsetForPartitionException since " + > > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See > > KAFKA-3370") > > } > > toSeek.asScala.foreach { case (topicPartition, offset) => > > *consumer.seek(topicPartition, offset)* > > } > > } > > > > At the start of the job, I also ensure we are supplying all required > offsets > > correctly > > > > private Map<TopicPartition, Long> getCommittedOffsets(String topic) { > > Map<TopicPartition, Long> offsets = new HashMap<>(); > > List<TopicPartition> topicPartitions = > > consumer.partitionsFor(topic).stream().map(partitionInfo -> > > new TopicPartition(partitionInfo.topic(), > > partitionInfo.partition())) > > .collect(Collectors.toList()); > > Map<TopicPartition, Long> earliestOffsets = > > consumer.beginningOffsets(topicPartitions); > > // pick committed offsets > > for (TopicPartition topicAndPartition : topicPartitions) { > > final OffsetAndMetadata committed = > > consumer.committed(topicAndPartition); > > Long earliestOffset = earliestOffsets.get(topicAndPartition); > > if (committed != null && committed.offset() > earliestOffset) { > > logger > > .warn( > > "Committed offset found for: {} offset:{} -> Hence adding > > committed offset", > > topicAndPartition, committed.offset()); > > offsets.put(topicAndPartition, committed.offset()); > > } else { > > logger > > .warn( > > "New partition/stale offset found for: {} offset:{} -> > Hence > > adding earliest offset", > > topicAndPartition, earliestOffset); > > offsets.put(topicAndPartition, earliestOffset); > > } > > } > > return offsets; > > } > > > > The actual stack trace: > > > > Caused by: java.lang.IllegalStateException: No current assignment for > > partition genericEvents-1 > > 2017-11-23 10:35:24,677 - at > > > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269) > > 2017-11-23 10:35:24,677 - at > > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294) > > 2017-11-23 10:35:24,677 - at > > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249) > > 2017-11-23 10:35:24,678 - at > > > org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107) > > 2017-11-23 10:35:24,678 - at > > > org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106) > > 2017-11-23 10:35:24,678 - at > > scala.collection.Iterator$class.foreach(Iterator.scala:893) > > 2017-11-23 10:35:24,678 - at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > 2017-11-23 10:35:24,678 - at > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > 2017-11-23 10:35:24,678 - at > > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > 2017-11-23 10:35:24,678 - at > > > org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106) > > 2017-11-23 10:35:24,679 - at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72) > > 2017-11-23 10:35:24,679 - at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242) > > 2017-11-23 10:35:24,679 - at > > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) > > 2017-11-23 10:35:24,679 - at > > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) > > 2017-11-23 10:35:24,679 - at > > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) > > 2017-11-23 10:35:24,679 - at > > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) > > 2017-11-23 10:35:24,679 - at > > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) > > 2017-11-23 10:35:24,680 - at > > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) > > 2017-11-23 10:35:24,680 - at > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) > > 2017-11-23 10:35:24,680 - at > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) > > 2017-11-23 10:35:24,680 - at > > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) > > 2017-11-23 10:35:24,680 - at > > > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) > > 2017-11-23 10:35:24,680 - at > > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159) > > 2017-11-23 10:35:24,680 - at > > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443) > > 2017-11-23 10:35:24,681 - at > > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149) > > 2017-11-23 10:35:24,681 - at > > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) > > 2017-11-23 10:35:24,681 - at > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) > > 2017-11-23 10:35:24,681 - at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > > > > > > > -- > > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >