Yes I use latest Kafka clients 0.11 to determine beginning offsets without
seek and also I use Kafka offsets commits externally.
I dont find the spark async commit  useful for our needs.

Thanks
Venkat

On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <c...@koeninger.org> wrote:

> You mentioned 0.11 version; the latest version of org.apache.kafka
> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
> has an appropriate dependency.
>
> Are you manually depending on a different version of the kafka-clients
> artifact?
>
> On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <meven...@gmail.com> wrote:
> > Version: 2.2 with Kafka010
> >
> > Hi,
> >
> > We are running spark streaming on AWS and trying to process incoming
> > messages on Kafka topics. All was well.
> > Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
> and
> > Kafka 0.11 version of server.
> >
> > With this new version of software we are facing issues with regard to 'No
> > assignment to partition for a topic and it happens intermittently'. I
> > construct four DStreams with different group.ids as suggested.
> >
> > The main source of code thats causing the issue is this one
> >
> > if (!toSeek.isEmpty) {
> >       // work around KAFKA-3370 when reset is none
> >       // poll will throw if no position, i.e. auto offset reset none and
> no
> > explicit position
> >       // but cant seek to a position before poll, because poll is what
> gets
> > subscription partitions
> >       // So, poll, suppress the first exception, then seek
> >       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
> >       val shouldSuppress = aor != null &&
> > aor.asInstanceOf[String].toUpperCase == "NONE"
> >       try {
> >         consumer.poll(0)
> >       } catch {
> >         case x: NoOffsetForPartitionException if shouldSuppress =>
> >           logWarning("Catching NoOffsetForPartitionException since " +
> >             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> > KAFKA-3370")
> >       }
> >       toSeek.asScala.foreach { case (topicPartition, offset) =>
> >           *consumer.seek(topicPartition, offset)*
> >       }
> >     }
> >
> > At the start of the job, I also ensure we are supplying all required
> offsets
> > correctly
> >
> > private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
> >     Map<TopicPartition, Long> offsets = new HashMap<>();
> >     List<TopicPartition> topicPartitions =
> >         consumer.partitionsFor(topic).stream().map(partitionInfo ->
> >             new TopicPartition(partitionInfo.topic(),
> > partitionInfo.partition()))
> >             .collect(Collectors.toList());
> >     Map<TopicPartition, Long> earliestOffsets =
> > consumer.beginningOffsets(topicPartitions);
> >     // pick committed offsets
> >     for (TopicPartition topicAndPartition : topicPartitions) {
> >       final OffsetAndMetadata committed =
> > consumer.committed(topicAndPartition);
> >       Long earliestOffset = earliestOffsets.get(topicAndPartition);
> >       if (committed != null && committed.offset() > earliestOffset) {
> >         logger
> >             .warn(
> >                 "Committed offset found for: {} offset:{} -> Hence adding
> > committed offset",
> >                 topicAndPartition, committed.offset());
> >         offsets.put(topicAndPartition, committed.offset());
> >       } else {
> >         logger
> >             .warn(
> >                 "New partition/stale offset found for: {} offset:{} ->
> Hence
> > adding earliest offset",
> >                 topicAndPartition, earliestOffset);
> >         offsets.put(topicAndPartition, earliestOffset);
> >       }
> >     }
> >     return offsets;
> >   }
> >
> > The actual stack trace:
> >
> > Caused by: java.lang.IllegalStateException: No current assignment for
> > partition genericEvents-1
> > 2017-11-23 10:35:24,677 -    at
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> > 2017-11-23 10:35:24,677 -    at
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> > 2017-11-23 10:35:24,677 -    at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> > 2017-11-23 10:35:24,678 -    at
> >
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> > 2017-11-23 10:35:24,678 -    at
> >
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> > 2017-11-23 10:35:24,678 -    at
> > scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > 2017-11-23 10:35:24,678 -    at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > 2017-11-23 10:35:24,678 -    at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > 2017-11-23 10:35:24,678 -    at
> > scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > 2017-11-23 10:35:24,678 -    at
> >
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> > 2017-11-23 10:35:24,679 -    at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> > 2017-11-23 10:35:24,679 -    at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> > 2017-11-23 10:35:24,679 -    at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> > 2017-11-23 10:35:24,679 -    at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> > 2017-11-23 10:35:24,679 -    at
> >
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> > 2017-11-23 10:35:24,679 -    at
> >
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> > 2017-11-23 10:35:24,679 -    at
> >
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> > 2017-11-23 10:35:24,680 -    at
> >
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> > 2017-11-23 10:35:24,680 -    at
> > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> > 2017-11-23 10:35:24,680 -    at
> > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> > 2017-11-23 10:35:24,680 -    at
> > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> > 2017-11-23 10:35:24,680 -    at
> >
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> > 2017-11-23 10:35:24,680 -    at
> >
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> > 2017-11-23 10:35:24,680 -    at
> >
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> > 2017-11-23 10:35:24,681 -    at
> >
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> > 2017-11-23 10:35:24,681 -    at
> >
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> > 2017-11-23 10:35:24,681 -    at
> > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> > 2017-11-23 10:35:24,681 -    at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>

Reply via email to