I doubt that fix will get backported to 2.3.x

Are you able to test against master?  2.4 with the fix you linked to
is likely to hit code freeze soon.

>From a quick look at your code, I'm not sure why you're mapping over
an array of brokers.  It seems like that would result in different
streams with the same group id, because broker isn't part of your
group id string.

On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> Hello, Spark Users.
>
> We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> have a Spark streaming job, and we're reading a reasonable amount of data
> from Kafka (40 GB / minute or so).  We would like to move to using the Kafka
> 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to
> modify formats.
>
> We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried to
> work around it as follows:
>
> 1. Disabled consumer caching.  This increased the total job time from ~1
> minute per batch to ~1.8 minutes per batch.  This performance penalty is
> unacceptable for our use-case. We also saw some partitions stop receiving
> for an extended period of time - I was unable to get a simple repro for this
> effect though.
> 2. Disabled speculation and multiple-job concurrency and added caching for
> the stream directly after reading from Kafka & caching offsets.  This
> approach seems to work well for simple examples (read from a Kafka topic,
> write to another topic). However, when we move through more complex logic we
> continue to see this type of error - despite only creating the stream for a
> given topic a single time.  We validated that we're creating the stream from
> a given topic / partition a single time by logging on stream creation,
> caching the stream and (eventually) calling 'runJob' to actually go and
> fetch the data. Nonetheless with multiple outputs we see the
> ConcurrentModificationException.
>
> I've included some code down below.  I would be happy if anyone had
> debugging tips for the workaround.  However, my main concern is to ensure
> that the 2.4 version will have a bug fix that will work for Spark Streaming
> in which multiple input topics map data to multiple outputs. I would also
> like to understand if the fix (https://github.com/apache/spark/pull/20997)
> will be backported to Spark 2.3.x
>
> In our code, read looks like the following:
>
> case class StreamLookupKey(topic: Set[String], brokers: String)
>
> private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
>
> // Given inputs return a direct stream.
> def createDirectStream(ssc: StreamingContext,
>                        additionalKafkaParameters: Map[String, String],
>                        brokersToUse: Array[String], //
> broker1,broker2|broker3,broker4
>                        topicsToUse: Array[String],
>                        applicationName: String,
>                        persist: Option[PersistenceManager],
>                        useOldestOffsets: Boolean,
>                        maxRatePerPartition: Long,
>                        batchSeconds: Int
>                       ): DStream[DecodedData] = {
>   val streams: Array[DStream[DecodedData]] =
>     brokersToUse.map(brokers => {
>       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>       val kafkaParameters: Map[String, String] = getKafkaParameters(brokers,
> useOldestOffsets, groupId) ++ additionalKafkaParameters
>       logger.info(s"Kafka Params: ${kafkaParameters}")
>       val topics = topicsToUse.toSet
>       logger.info(s"Creating Kafka direct connection -
> ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> applicationGroup: ${groupId}")
>
>       streamMap.getOrElse(StreamLookupKey(topics, brokers),
> createKafkaStream(ssc, applicationName, topics, brokers,
> maxRatePerPartition, batchSeconds, kafkaParameters))
>     })
>
>   ssc.union(streams)
> }
>
> private def createKafkaStream(ssc: StreamingContext, applicationName:
> String, topics: Set[String], brokers: String,
>                               maxRatePerPartition: Long, batchSeconds: Int,
> kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>   logger.info(s"Creating a stream from Kafka for application
> ${applicationName} w/ topic ${topics} and " +
>     s"brokers: ${brokers.split(',').head} with parameters:
> ${kafkaParameters.mkString("|")}")
>   try {
>     val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
>     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>       KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>
>     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream,
> maxRatePerPartition, batchSeconds)
>     val result =
> stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
>     streamMap += StreamLookupKey(topics, brokers) -> result
>     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: Iterator[_])
> => {}))
>     result
>   } catch ErrorHandling.safelyCatch {
>     case e: Exception =>
>       logger.error("Unable to create direct stream:")
>       e.printStackTrace()
>       throw KafkaDirectStreamException(topics.toArray, brokers, e)
>   }
> }
>
> def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
> applicationName: String): Map[String, String] =
>   Map[String, String](
>     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
>     "enable.auto.commit" -> false.toString, // we'll commit these manually
>     "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>     "value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
>     "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>     "bootstrap.servers" -> brokers,
>     "group.id" -> applicationName,
>     "session.timeout.ms" -> 240000.toString,
>     "request.timeout.ms"-> 300000.toString
>   )
>
> Write code looks like the following:
>
> def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv:
> (T) => Array[Byte], numPartitions: Int): Unit = {
>   val rddToWrite =
>     if (numPartitions > 0) {
>       rdd.repartition(numPartitions)
>     } else {
>       rdd
>     }
>
>   // Get session from current threads session
>   val session = SparkSession.builder().getOrCreate()
>   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
> StructType(Array(StructField("value", BinaryType))))
>   df.selectExpr("CAST('' AS STRING)", "value")
>     .write
>     .format("kafka")
>     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
>     .option("compression.type", "gzip")
>     .option("retries", "3")
>     .option("topic", topic)
>     .save()
> }
>
> Regards,
>
> Bryan Jeffrey

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to