[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r323996774 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ## @@ -131,9 +145,26 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } +val headersExpression = inputSchema + .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( + Literal(CatalystTypeConverters.convertToCatalyst(null), Review comment: The code formatter of the IDE. Also, it passes the style checker of mvn. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r323045234 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ## @@ -369,17 +371,36 @@ class KafkaTestUtils( topic: String, messages: Array[String], partition: Option[Int]): Seq[(String, RecordMetadata)] = { +sendMessages(topic, messages.map(m => (m, Seq())), partition) + } + + /** Send record to the Kafka broker with headers using specified partition */ + def sendMessage(topic: String, + record: (String, Seq[(String, Array[Byte])]), + partition: Option[Int]): Seq[(String, RecordMetadata)] = { +sendMessages(topic, Array(record).toSeq, partition) + } + + /** Send the array of records to the Kafka broker with headers using specified partition */ + def sendMessages(topic: String, + records: Seq[(String, Seq[(String, Array[Byte])])], + partition: Option[Int]): Seq[(String, RecordMetadata)] = { producer = new KafkaProducer[String, String](producerConfiguration) val offsets = try { - messages.map { m => + records.map { case (value, header) => +val headers = header.map { case (k, v) => + new RecordHeader(k, v).asInstanceOf[Header] +} val record = partition match { - case Some(p) => new ProducerRecord[String, String](topic, p, null, m) - case None => new ProducerRecord[String, String](topic, m) + case Some(p) => +new ProducerRecord[String, String](topic, p, null, value, headers.asJava) Review comment: We can't; `Option#orNull` returns scala `Null` type, not java `null`. Because of that, the compiler can't resolve the appropriate constructor then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r323045178 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ## @@ -131,9 +145,26 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } +val headersExpression = inputSchema + .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( + Literal(CatalystTypeConverters.convertToCatalyst(null), Review comment: I tried, but the formatter reverts the indention to the current status. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r316531866 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -433,4 +441,42 @@ private[kafka010] object KafkaOffsetReader { StructField("timestamp", TimestampType), StructField("timestampType", IntegerType) )) + + val schemaWithHeaders = { +new StructType(schemaWithoutHeaders.fields :+ StructField("headers", headersType)) + } + + def kafkaSchema(includeHeaders: Boolean): StructType = { +if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders + } + + def toInternalRowWithoutHeaders: Record => InternalRow = +(cr: Record) => InternalRow( + cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset, + DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), cr.timestampType.id +) + + def toInternalRowWithHeaders: Record => InternalRow = +(cr: Record) => InternalRow( + cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset, + DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), cr.timestampType.id, + if (cr.headers.iterator().hasNext) { +new GenericArrayData(cr.headers.iterator().asScala + .map(header => +InternalRow(UTF8String.fromString(header.key()), header.value()) + ).toArray) + } else { +null + } +) + + def toUnsafeRowWithoutHeadersProjector: Record => UnsafeRow = +(cr: Record) => UnsafeProjection.create(schemaWithoutHeaders)(toInternalRowWithoutHeaders(cr)) Review comment: Sorry, I misunderstood your intention; now `UnsafeProjection`s are extracted into `val`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r314967556 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala ## @@ -105,7 +105,17 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester { val headers = record.headers().toArray.map(header => (header.key(), header.value())).toSeq (value, headers) } -data === rcvd +data zip rcvd foreach { case (expected, actual) => Review comment: @srowen Got it. Here is the fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r314933462 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala ## @@ -95,10 +100,12 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester { try { val range = consumer.getAvailableOffsetRange() val rcvd = range.earliest until range.latest map { offset => - val bytes = consumer.get(offset, Long.MaxValue, 1, failOnDataLoss = false).value() - new String(bytes) + val record = consumer.get(offset, Long.MaxValue, 1, failOnDataLoss = false) + val value = new String(record.value(), StandardCharsets.UTF_8) + val headers = record.headers().toArray.map(header => (header.key(), header.value())).toSeq + (value, headers) } -assert(rcvd == data) +data === rcvd Review comment: @srowen It was just a mistake. It is now updated with a deep comparison. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r314926467 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ## @@ -88,7 +92,19 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } -val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value) +} else { + val headerArray = projectedRow.getArray(3) + val headers = (0 until headerArray.numElements()).map( +i => { + val struct = headerArray.getStruct(i, 2) + new RecordHeader(struct.getUTF8String(0).toString, struct.getBinary(1)) +.asInstanceOf[Header] Review comment: @srowen Yes, for the limitation of Java type system, `Iterable[RecordHeader]` is not regarded as `Iterable[Header]`. It's why. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r314925693 ## File path: docs/structured-streaming-kafka-integration.md ## @@ -27,6 +27,8 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli artifactId = spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}} version = {{site.SPARK_VERSION_SHORT}} +Please note that to use the headers functionality, your Kafka client version should be version 0.11.0.0 or up. Review comment: @srowen The user may downgrade the kafka dependency to 0.10. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r307090135 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala ## @@ -158,19 +157,21 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi val topic = newTopic() testUtils.createTopic(topic, partitions = 3) testUtils.sendMessage( - topic, ("1", Array(("once", "1".getBytes), ("twice", "2".getBytes))), Some(0) + topic, ("1", Seq()), Some(0) ) testUtils.sendMessage( - topic, ("2", Array(("once", "2".getBytes), ("twice", "4".getBytes))), Some(1) + topic, ("2", Seq(("a", "b".getBytes("UTF-8")), ("c", "d".getBytes("UTF-8", Some(1) Review comment: You are right, `StandardCharsets.UTF_8` is definitely better and I also prefer it. However, as of present `StandardCharsets.UTF_8` and `"UTF-8"` are used in the whole codebase mixed up. It seems like they should be cleaned up in the other PR. @HyukjinKwon May I take it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r307089669 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ## @@ -67,8 +67,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -validateStreamOptions(caseInsensitiveParams) +validateStreamOptions(parameters) Review comment: Sorry for the confusion. But these changes are for reverting the irrelevant modifications included in the previous commits; in the files tab, you can see these changes are not included in the final diff. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r307084987 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala ## @@ -184,9 +188,10 @@ class KafkaContinuousPartitionReader( startOffset: Long, kafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] { +failOnDataLoss: Boolean, +includeHeaders: Boolean) extends ContinuousPartitionReader[InternalRow] { private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false) - private val converter = new KafkaRecordToUnsafeRowConverter Review comment: @HeartSaVioR Great. Let's continue on discussing which name would be the best. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r306823698 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaUtilsSuite.scala ## @@ -61,4 +63,24 @@ class UnsafeMapSuite extends SparkFunSuite { assert(mapDataSer.valueArray().getInt(0) == 19286) assert(mapDataSer.getBaseObject.asInstanceOf[Array[Byte]].length == 1024) } + + test("UnsafeMap from Kafka Headers") { Review comment: Since `UnsafeMap` is not used anymore, it would be better to remove both of `UnsafeMap` and `KafkaUtilsSuite`. If you need this class, please open a issue and notify me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r306823698 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaUtilsSuite.scala ## @@ -61,4 +63,24 @@ class UnsafeMapSuite extends SparkFunSuite { assert(mapDataSer.valueArray().getInt(0) == 19286) assert(mapDataSer.getBaseObject.asInstanceOf[Array[Byte]].length == 1024) } + + test("UnsafeMap from Kafka Headers") { Review comment: Since `UnsafeMap` is not used anymore, it would be better to remove both of `UnsafeMap` and `KafkaUtilsSuite`. If you need this class, please open a issue and notify me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r306151018 ## File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java ## @@ -30,9 +30,10 @@ import com.esotericsoftware.kryo.KryoSerializable; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.*; +import org.apache.spark.sql.types.DataType; Review comment: Sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r295427548 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala ## @@ -54,7 +55,8 @@ class KafkaContinuousStream( extends ContinuousStream with Logging { private val pollTimeoutMs = sourceOptions.getOrElse(CONSUMER_POLL_TIMEOUT, "512").toLong - private val includeHeaders = sourceOptions.getOrElse(INCLUDE_HEADERS, "false").toBoolean + private val includeHeaders = Review comment: @HeartSaVioR I found why [the previous tests were failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/106646/testReport/), but the reason is a little bit wierd. It seems like the keys of `sourceOptions` are all lowercased, so the `includeHeaders` option is not detected without lowercased This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r295077620 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala ## @@ -102,17 +103,35 @@ private[kafka010] class KafkaRelation( // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. val executorKafkaParams = KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId) -val rdd = new KafkaSourceRDD( - sqlContext.sparkContext, executorKafkaParams, offsetRanges, - pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr => - InternalRow( -cr.key, -cr.value, -UTF8String.fromString(cr.topic), -cr.partition, -cr.offset, -DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), -cr.timestampType.id) +val rdd = if (includeHeaders) { + new KafkaSourceRDD( +sqlContext.sparkContext, executorKafkaParams, offsetRanges, +pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr => + InternalRow( Review comment: I first took that approach, but it results in a serialization error, dislike to `KafkaOffsetReader` above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming URL: https://github.com/apache/spark/pull/22282#discussion_r295077589 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala ## @@ -38,19 +38,21 @@ import org.apache.spark.sql.sources.v2.reader.streaming._ * read by per-task consumers generated later. * @param kafkaParams String params for per-task Kafka consumers. * @param sourceOptions Params which are not Kafka consumer params. - * @param metadataPath Path to a directory this reader can use for writing metadata. + * @param metadataPath Path to a directory this reader can use for writing metadata. * @param initialOffsets The Kafka offsets to start reading data at. * @param failOnDataLoss Flag indicating whether reading should fail in data loss * scenarios, where some offsets after the specified initial ones can't be * properly read. + * @param includeHeaders Flag indicating whether to include Kafka records' headers. */ class KafkaContinuousStream( offsetReader: KafkaOffsetReader, kafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, initialOffsets: KafkaOffsetRangeLimit, -failOnDataLoss: Boolean) +failOnDataLoss: Boolean, +includeHeaders: Boolean) Review comment: You are right, `includeHeaders` can be retrieved from `sourceOptions` like `pollTimeoutMs` does. It also applies to the case of `KafkaSource` and `KafkaMicroBatchStream`. So, reverted into the previous constructors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org