Here are some examples and details of the scenarios. The KafkaRDD is the most
error prone to polling 
timeouts and concurrentm modification errors.

*Using KafkaRDD* - This takes a list of channels and processes them in
parallel using the KafkaRDD directly. they all use the same consumer group
('storage-group'), but each has it's own topic and each topic has 4
partitions. We routinely get timeout errors when polling for data. This
occurs whether we process in parallel or sequentially. 

*Spark Kafka setting:*
spark.streaming.kafka.consumer.poll.ms=2000

*Kafka Consumer Params:*
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [somemachine:31000]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = 
ssl.endpoint.identification.algorithm = null
max.poll.records = 1000
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class
com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
group.id = storage-group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

*Example usage with KafkaRDD:*
val channels = Seq("channel1", "channel2")

channels.toParArray.foreach { channel =>
  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)

  // Get offsets for the given topic and the consumer group 'storage-group'
  val offsetRanges = getOffsets("storage-group", channel)

  val ds = KafkaUtils.createRDD[K, V](context,
        kafkaParams asJava,
        offsetRanges,
        PreferConsistent).toDS[V]

  // Do some aggregations
  ds.agg(...)
  // Save the data
  ds.write.mode(SaveMode.Append).parquet(somePath)
  // Save offsets using a KafkaConsumer
  consumer.commitSync(newOffsets.asJava)
  consumer.close()
}


*Example usage with Kafka Stream:*
This creates a stream and processes events in each partition. At the end of
processing for
each partition, we updated the offsets for each partition. This is
challenging to do, but is better
then calling commitAysnc on the stream, because that occurs after the
/entire/ RDD has been 
processed. This method minimizes duplicates in an exactly once environment.
Since the executors 
use their own custom group "spark-executor-processor-group" and the commit
is buried in private 
functions we are unable to use the executors cached consumer to update the
offsets. This requires us
to go through multiple steps to update the Kafka offsets accordingly.

val offsetRanges = getOffsets("processor-group", "my-topic")

val stream = KafkaUtils.createDirectStream[K, V](context,
      PreferConsistent,
      Subscribe[K, V](Seq("my-topic") asJavaCollection,
        kafkaParams,
        offsetRanges))

stream.foreachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    // Transform our data
   rdd.foreachPartition { events =>
       // Establish a consumer in the executor so we can update offsets
after each partition.
       // This class is homegrown and uses the KafkaConsumer to help get/set
offsets
       val consumer = new ConsumerUtils(kafkaParams)
       // do something with our data
       
       // Write the offsets that were updated in this partition 
       kafkaConsumer.setConsumerOffsets("processor-group",
          Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
   }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to