[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-09-12 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r323996774
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ##
 @@ -131,9 +145,26 @@ private[kafka010] abstract class KafkaRowWriter(
 throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} 
" +
   s"attribute unsupported type ${t.catalogString}")
 }
+val headersExpression = inputSchema
+  .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+  Literal(CatalystTypeConverters.convertToCatalyst(null),
 
 Review comment:
   The code formatter of the IDE. Also, it passes the style checker of mvn.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-09-10 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r323045234
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ##
 @@ -369,17 +371,36 @@ class KafkaTestUtils(
   topic: String,
   messages: Array[String],
   partition: Option[Int]): Seq[(String, RecordMetadata)] = {
+sendMessages(topic, messages.map(m => (m, Seq())), partition)
+  }
+
+  /** Send record to the Kafka broker with headers using specified partition */
+  def sendMessage(topic: String,
+  record: (String, Seq[(String, Array[Byte])]),
+  partition: Option[Int]): Seq[(String, RecordMetadata)] = {
+sendMessages(topic, Array(record).toSeq, partition)
+  }
+
+  /** Send the array of records to the Kafka broker with headers using 
specified partition */
+  def sendMessages(topic: String,
+   records: Seq[(String, Seq[(String, Array[Byte])])],
+   partition: Option[Int]): Seq[(String, RecordMetadata)] = {
 producer = new KafkaProducer[String, String](producerConfiguration)
 val offsets = try {
-  messages.map { m =>
+  records.map { case (value, header) =>
+val headers = header.map { case (k, v) =>
+  new RecordHeader(k, v).asInstanceOf[Header]
+}
 val record = partition match {
-  case Some(p) => new ProducerRecord[String, String](topic, p, null, m)
-  case None => new ProducerRecord[String, String](topic, m)
+  case Some(p) =>
+new ProducerRecord[String, String](topic, p, null, value, 
headers.asJava)
 
 Review comment:
   We can't; `Option#orNull` returns scala `Null` type, not java `null`. 
Because of that, the compiler can't resolve the appropriate constructor then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-09-10 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r323045178
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ##
 @@ -131,9 +145,26 @@ private[kafka010] abstract class KafkaRowWriter(
 throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} 
" +
   s"attribute unsupported type ${t.catalogString}")
 }
+val headersExpression = inputSchema
+  .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+  Literal(CatalystTypeConverters.convertToCatalyst(null),
 
 Review comment:
   I tried, but the formatter reverts the indention to the current status.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-08-22 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r316531866
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -433,4 +441,42 @@ private[kafka010] object KafkaOffsetReader {
 StructField("timestamp", TimestampType),
 StructField("timestampType", IntegerType)
   ))
+
+  val schemaWithHeaders = {
+new StructType(schemaWithoutHeaders.fields :+ StructField("headers", 
headersType))
+  }
+
+  def kafkaSchema(includeHeaders: Boolean): StructType = {
+if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
+  }
+
+  def toInternalRowWithoutHeaders: Record => InternalRow =
+(cr: Record) => InternalRow(
+  cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, 
cr.offset,
+  DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), 
cr.timestampType.id
+)
+
+  def toInternalRowWithHeaders: Record => InternalRow =
+(cr: Record) => InternalRow(
+  cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, 
cr.offset,
+  DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), 
cr.timestampType.id,
+  if (cr.headers.iterator().hasNext) {
+new GenericArrayData(cr.headers.iterator().asScala
+  .map(header =>
+InternalRow(UTF8String.fromString(header.key()), header.value())
+  ).toArray)
+  } else {
+null
+  }
+)
+
+  def toUnsafeRowWithoutHeadersProjector: Record => UnsafeRow =
+(cr: Record) => 
UnsafeProjection.create(schemaWithoutHeaders)(toInternalRowWithoutHeaders(cr))
 
 Review comment:
   Sorry, I misunderstood your intention; now `UnsafeProjection`s are extracted 
into `val`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-08-17 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r314967556
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
 ##
 @@ -105,7 +105,17 @@ class KafkaDataConsumerSuite extends SharedSQLContext 
with PrivateMethodTester {
   val headers = record.headers().toArray.map(header => (header.key(), 
header.value())).toSeq
   (value, headers)
 }
-data === rcvd
+data zip rcvd foreach { case (expected, actual) =>
 
 Review comment:
   @srowen Got it. Here is the fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-08-16 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r314933462
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
 ##
 @@ -95,10 +100,12 @@ class KafkaDataConsumerSuite extends SharedSQLContext 
with PrivateMethodTester {
   try {
 val range = consumer.getAvailableOffsetRange()
 val rcvd = range.earliest until range.latest map { offset =>
-  val bytes = consumer.get(offset, Long.MaxValue, 1, 
failOnDataLoss = false).value()
-  new String(bytes)
+  val record = consumer.get(offset, Long.MaxValue, 1, 
failOnDataLoss = false)
+  val value = new String(record.value(), StandardCharsets.UTF_8)
+  val headers = record.headers().toArray.map(header => (header.key(), 
header.value())).toSeq
+  (value, headers)
 }
-assert(rcvd == data)
+data === rcvd
 
 Review comment:
   @srowen It was just a mistake. It is now updated with a deep comparison.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-08-16 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r314926467
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ##
 @@ -88,7 +92,19 @@ private[kafka010] abstract class KafkaRowWriter(
   throw new NullPointerException(s"null topic present in the data. Use the 
" +
 s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default 
topic.")
 }
-val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, 
key, value)
+val record = if (projectedRow.isNullAt(3)) {
+  new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, 
value)
+} else {
+  val headerArray = projectedRow.getArray(3)
+  val headers = (0 until headerArray.numElements()).map(
+i => {
+  val struct = headerArray.getStruct(i, 2)
+  new RecordHeader(struct.getUTF8String(0).toString, 
struct.getBinary(1))
+.asInstanceOf[Header]
 
 Review comment:
   @srowen Yes, for the limitation of Java type system, 
`Iterable[RecordHeader]` is not regarded as `Iterable[Header]`. It's why.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-08-16 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r314925693
 
 

 ##
 File path: docs/structured-streaming-kafka-integration.md
 ##
 @@ -27,6 +27,8 @@ For Scala/Java applications using SBT/Maven project 
definitions, link your appli
 artifactId = spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
 version = {{site.SPARK_VERSION_SHORT}}
 
+Please note that to use the headers functionality, your Kafka client version 
should be version 0.11.0.0 or up.
 
 Review comment:
   @srowen The user may downgrade the kafka dependency to 0.10.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-07-24 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r307090135
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ##
 @@ -158,19 +157,21 @@ abstract class KafkaRelationSuiteBase extends QueryTest 
with SharedSQLContext wi
 val topic = newTopic()
 testUtils.createTopic(topic, partitions = 3)
 testUtils.sendMessage(
-  topic, ("1", Array(("once", "1".getBytes), ("twice", "2".getBytes))), 
Some(0)
+  topic, ("1", Seq()), Some(0)
 )
 testUtils.sendMessage(
-  topic, ("2", Array(("once", "2".getBytes), ("twice", "4".getBytes))), 
Some(1)
+  topic, ("2", Seq(("a", "b".getBytes("UTF-8")), ("c", 
"d".getBytes("UTF-8", Some(1)
 
 Review comment:
   You are right, `StandardCharsets.UTF_8` is definitely better and I also 
prefer it. However, as of present `StandardCharsets.UTF_8` and `"UTF-8"` are 
used in the whole codebase mixed up. It seems like they should be cleaned up in 
the other PR. @HyukjinKwon May I take it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-07-24 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r307089669
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ##
 @@ -67,8 +67,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   schema: Option[StructType],
   providerName: String,
   parameters: Map[String, String]): (String, StructType) = {
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-validateStreamOptions(caseInsensitiveParams)
+validateStreamOptions(parameters)
 
 Review comment:
   Sorry for the confusion. But these changes are for reverting the irrelevant 
modifications included in the previous commits; in the files tab, you can see 
these changes are not included in the final diff.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-07-24 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r307084987
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 ##
 @@ -184,9 +188,10 @@ class KafkaContinuousPartitionReader(
 startOffset: Long,
 kafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] {
+failOnDataLoss: Boolean,
+includeHeaders: Boolean) extends ContinuousPartitionReader[InternalRow] {
   private val consumer = KafkaDataConsumer.acquire(topicPartition, 
kafkaParams, useCache = false)
-  private val converter = new KafkaRecordToUnsafeRowConverter
 
 Review comment:
   @HeartSaVioR Great. Let's continue on discussing which name would be the 
best.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-07-24 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r306823698
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaUtilsSuite.scala
 ##
 @@ -61,4 +63,24 @@ class UnsafeMapSuite extends SparkFunSuite {
 assert(mapDataSer.valueArray().getInt(0) == 19286)
 assert(mapDataSer.getBaseObject.asInstanceOf[Array[Byte]].length == 1024)
   }
+
+  test("UnsafeMap from Kafka Headers") {
 
 Review comment:
   Since `UnsafeMap` is not used anymore, it would be better to remove both of 
`UnsafeMap` and `KafkaUtilsSuite`. If you need this class, please open a issue 
and notify me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-07-24 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r306823698
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaUtilsSuite.scala
 ##
 @@ -61,4 +63,24 @@ class UnsafeMapSuite extends SparkFunSuite {
 assert(mapDataSer.valueArray().getInt(0) == 19286)
 assert(mapDataSer.getBaseObject.asInstanceOf[Array[Byte]].length == 1024)
   }
+
+  test("UnsafeMap from Kafka Headers") {
 
 Review comment:
   Since `UnsafeMap` is not used anymore, it would be better to remove both of 
`UnsafeMap` and `KafkaUtilsSuite`. If you need this class, please open a issue 
and notify me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-07-22 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r306151018
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 ##
 @@ -30,9 +30,10 @@
 import com.esotericsoftware.kryo.KryoSerializable;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.DataType;
 
 Review comment:
   Sure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-06-19 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r295427548
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 ##
 @@ -54,7 +55,8 @@ class KafkaContinuousStream(
   extends ContinuousStream with Logging {
 
   private val pollTimeoutMs = sourceOptions.getOrElse(CONSUMER_POLL_TIMEOUT, 
"512").toLong
-  private val includeHeaders = sourceOptions.getOrElse(INCLUDE_HEADERS, 
"false").toBoolean
+  private val includeHeaders =
 
 Review comment:
   @HeartSaVioR I found why [the previous tests were 
failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/106646/testReport/),
 but the reason is a little bit wierd. It seems like the keys of 
`sourceOptions` are all lowercased, so the `includeHeaders` option is not 
detected without lowercased


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-06-18 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r295077620
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ##
 @@ -102,17 +103,35 @@ private[kafka010] class KafkaRelation(
 // Create an RDD that reads from Kafka and get the (key, value) pair as 
byte arrays.
 val executorKafkaParams =
   KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, 
uniqueGroupId)
-val rdd = new KafkaSourceRDD(
-  sqlContext.sparkContext, executorKafkaParams, offsetRanges,
-  pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
-  InternalRow(
-cr.key,
-cr.value,
-UTF8String.fromString(cr.topic),
-cr.partition,
-cr.offset,
-DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
-cr.timestampType.id)
+val rdd = if (includeHeaders) {
+  new KafkaSourceRDD(
+sqlContext.sparkContext, executorKafkaParams, offsetRanges,
+pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
+  InternalRow(
 
 Review comment:
   I first took that approach, but it results in a serialization error, dislike 
to `KafkaOffsetReader` above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

2019-06-18 Thread GitBox
dongjinleekr commented on a change in pull request #22282: [SPARK-23539][SS] 
Add support for Kafka headers in Structured Streaming
URL: https://github.com/apache/spark/pull/22282#discussion_r295077589
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 ##
 @@ -38,19 +38,21 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
  *  read by per-task consumers generated later.
  * @param kafkaParams   String params for per-task Kafka consumers.
  * @param sourceOptions Params which are not Kafka consumer params.
- * @param metadataPath Path to a directory this reader can use for writing 
metadata.
+ * @param metadataPath  Path to a directory this reader can use for writing 
metadata.
  * @param initialOffsets The Kafka offsets to start reading data at.
  * @param failOnDataLoss Flag indicating whether reading should fail in data 
loss
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
+ * @param includeHeaders Flag indicating whether to include Kafka records' 
headers.
  */
 class KafkaContinuousStream(
 offsetReader: KafkaOffsetReader,
 kafkaParams: ju.Map[String, Object],
 sourceOptions: Map[String, String],
 metadataPath: String,
 initialOffsets: KafkaOffsetRangeLimit,
-failOnDataLoss: Boolean)
+failOnDataLoss: Boolean,
+includeHeaders: Boolean)
 
 Review comment:
   You are right, `includeHeaders` can be retrieved from `sourceOptions` like 
`pollTimeoutMs` does. It also applies to the case of `KafkaSource` and 
`KafkaMicroBatchStream`. So, reverted into the previous constructors.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org