Here are some examples and details of the scenarios. The KafkaRDD is the most error prone to polling timeouts and concurrentm modification errors.
*Using KafkaRDD* - This takes a list of channels and processes them in parallel using the KafkaRDD directly. they all use the same consumer group ('storage-group'), but each has it's own topic and each topic has 4 partitions. We routinely get timeout errors when polling for data. This occurs whether we process in parallel or sequentially. *Spark Kafka setting:* spark.streaming.kafka.consumer.poll.ms=2000 *Kafka Consumer Params:* metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [somemachine:31000] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 1000 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer group.id = storage-group retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest *Example usage with KafkaRDD:* val channels = Seq("channel1", "channel2") channels.toParArray.foreach { channel => val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) // Get offsets for the given topic and the consumer group 'storage-group' val offsetRanges = getOffsets("storage-group", channel) val ds = KafkaUtils.createRDD[K, V](context, kafkaParams asJava, offsetRanges, PreferConsistent).toDS[V] // Do some aggregations ds.agg(...) // Save the data ds.write.mode(SaveMode.Append).parquet(somePath) // Save offsets using a KafkaConsumer consumer.commitSync(newOffsets.asJava) consumer.close() } *Example usage with Kafka Stream:* This creates a stream and processes events in each partition. At the end of processing for each partition, we updated the offsets for each partition. This is challenging to do, but is better then calling commitAysnc on the stream, because that occurs after the /entire/ RDD has been processed. This method minimizes duplicates in an exactly once environment. Since the executors use their own custom group "spark-executor-processor-group" and the commit is buried in private functions we are unable to use the executors cached consumer to update the offsets. This requires us to go through multiple steps to update the Kafka offsets accordingly. val offsetRanges = getOffsets("processor-group", "my-topic") val stream = KafkaUtils.createDirectStream[K, V](context, PreferConsistent, Subscribe[K, V](Seq("my-topic") asJavaCollection, kafkaParams, offsetRanges)) stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // Transform our data rdd.foreachPartition { events => // Establish a consumer in the executor so we can update offsets after each partition. // This class is homegrown and uses the KafkaConsumer to help get/set offsets val consumer = new ConsumerUtils(kafkaParams) // do something with our data // Write the offsets that were updated in this partition kafkaConsumer.setConsumerOffsets("processor-group", Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset)) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org