[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r215092933 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } -val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]]( +topic.toString, +null, +key, +value + ) +} else { + val headerMap = projectedRow.getMap(3) + val headers = (0 until headerMap.numElements()).toArray.map( +i => + new RecordHeader( --- End diff -- Yeah then I also think it is missing spot for Kafka. Just asked it to Kafka dev mailing list. https://lists.apache.org/thread.html/2ec3e7e2345e64ac559d98aaa28e0980f07a9778db447168e19d41d2@%3Cdev.kafka.apache.org%3E If Kafka community says it's missing spot, either of us can go ahead fixing that. You can take it forward if you're happy to do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user dongjinleekr commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214856258 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } -val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]]( +topic.toString, +null, +key, +value + ) +} else { + val headerMap = projectedRow.getMap(3) + val headers = (0 until headerMap.numElements()).toArray.map( +i => + new RecordHeader( --- End diff -- As of September 2018, `RecordHeader` is the only implementation provided by Kafka. As you can see [here](https://memorynotfound.com/spring-kafka-adding-custom-header-kafka-message-example/), this way is widely used - I think it is more natural for `RecordHeader` to be hidden by some builder classes but it's not. It seems a missing spot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214618743 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala --- @@ -44,6 +44,11 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { 5, DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp))) rowWriter.write(6, record.timestampType.id) +val keys = record.headers.toArray.map(_.key()) --- End diff -- Might be better to define a new local value for `record.headers.toArray`, because it creates a new array when `headers` is not empty. It also guarantees consistent view for extracting keys and values, though we know `headers` is unlikely to be modified during this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214635480 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } -val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]]( --- End diff -- nit: Spark scala style guide stated below: https://github.com/databricks/scala-style-guide#spacing-and-indentation > For method and class constructor invocations, use 2 space indentation for its parameters and put each in each line when the parameters don't fit in two lines. there're multiple places which these lines can be compacted into one or two lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214629265 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } -val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]]( +topic.toString, +null, +key, +value + ) +} else { + val headerMap = projectedRow.getMap(3) + val headers = (0 until headerMap.numElements()).toArray.map( --- End diff -- We could remove `.toArray` here and also `.toIterable` in `headers.toIterable.asJava` unless there's performance difference. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214638140 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -1254,6 +1254,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { .option("startingOffsets", s"earliest") .option("subscribe", topic) .load() + .selectExpr( --- End diff -- It would be better if we check exhaustive columns here, so adding check for headers sounds better to me instead of limiting columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214646749 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -1254,6 +1254,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { .option("startingOffsets", s"earliest") .option("subscribe", topic) .load() + .selectExpr( --- End diff -- I just indicated your comment that without `selectExpr` the test failed. Is it related to the addition of header field? I guess we should make sure it also works without `selectExpr` so that we can check also `headers` with memory sink. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214622654 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -293,7 +294,12 @@ private[kafka010] class KafkaSource( cr.partition, cr.offset, DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), -cr.timestampType.id) +cr.timestampType.id, +UnsafeMapData.of( + UnsafeArrayData.fromStringArray(cr.headers().toArray.map(_.key())), --- End diff -- Same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214622600 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala --- @@ -115,7 +116,12 @@ private[kafka010] class KafkaRelation( cr.partition, cr.offset, DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), -cr.timestampType.id) +cr.timestampType.id, +UnsafeMapData.of( + UnsafeArrayData.fromStringArray(cr.headers().toArray.map(_.key())), --- End diff -- Same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214640616 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala --- @@ -59,14 +59,23 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest { val topic = newTopic() testUtils.createTopic(topic) val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value") + .withColumn("headers", +map(lit("x"), col("value").plus(1).cast(IntegerType).cast(StringType).cast(BinaryType), + lit("y"), col("value").multiply(2).cast(IntegerType).cast(StringType).cast(BinaryType))) df.write .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("topic", topic) .save() checkAnswer( - createKafkaReader(topic).selectExpr("CAST(value as STRING) value"), - Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil) + createKafkaReader(topic).selectExpr( +"CAST(value as STRING) value", +"CAST(headers.x AS STRING)", +"CAST(headers.y AS STRING)" + ), + Row("1", "2", "2") :: Row("2", "3", "4") :: Row("3", "4", "6") :: Row("4", "5", "8" --- End diff -- nit: `) ::` could be added here instead of next line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214639674 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala --- @@ -136,6 +142,19 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest checkAnswer(df, (0 to 30).map(_.toString).toDF) } + test("default starting and ending offsets with headers") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 3) +testUtils.sendMessage(topic, (null, "1", Array(("once", "1"), ("twice", "2"))), Some(0)) +testUtils.sendMessage(topic, (null, "2", Array(("once", "2"), ("twice", "4"))), Some(1)) +testUtils.sendMessage(topic, (null, "3", Array(("once", "3"), ("twice", "6"))), Some(2)) + +// Implicit offset values, should default to earliest and latest +val df = createDF(topic, Map.empty[String, String], None, true) --- End diff -- nit: explicitly assigning `includeHeaders = true` may look easier to see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214632570 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } -val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]]( +topic.toString, +null, +key, +value + ) +} else { + val headerMap = projectedRow.getMap(3) + val headers = (0 until headerMap.numElements()).toArray.map( +i => + new RecordHeader( --- End diff -- Looks like RecordHeader is in `org.apache.kafka.common.header.internals` package. Is there any alternative public methods/classes to create Header? Or it is just a missing spot in Kafka? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user dongjinleekr commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214345393 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- Thank you for your kind guidance. I drop the commit removing some methods - it was totally wrong! :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user dongjinleekr commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214345173 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } +val headersExpression = inputSchema + .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( + Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType)) +) +headersExpression.dataType match { + case MapType(StringType, BinaryType, true) => // good + case t => +throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " + --- End diff -- Oh, I misunderstood; After reviewing the code, I found that `KafkaRowWriter#createProjection` throws `IllegalStateException` while `KafkaWriter#validateQuery` throwing `AnalysisException.` I think the reason should be attributed to the difference between two methods - while the former one detects the error from the state of `InternalRow,` the later one does by analyzing the expression's schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user dongjinleekr commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214198620 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } +val headersExpression = inputSchema + .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( + Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType)) +) +headersExpression.dataType match { + case MapType(StringType, BinaryType, true) => // good + case t => +throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " + --- End diff -- Just a typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214084903 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- Yep, the UT failed log proved this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214073971 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } +val headersExpression = inputSchema + .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( + Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType)) +) +headersExpression.dataType match { + case MapType(StringType, BinaryType, true) => // good + case t => +throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " + --- End diff -- This exception is different from the AnalysisException thrown in the next class. What's the reason ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214075761 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- I think `shouldUseGenericArrayData` is still used in generated code, check the code here: https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
GitHub user dongjinleekr opened a pull request: https://github.com/apache/spark/pull/22282 [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming ## What changes were proposed in this pull request? This update adds support for Kafka Headers functionality in Structured Streaming. ## How was this patch tested? With following unit tests: - KafkaRelationSuite: "default starting and ending offsets with headers" (new) - KafkaSinkSuite: "batch - write to kafka" (updated) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjinleekr/spark feature/SPARK-23539 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22282 commit ddd08612ef8bdb173f974059d2dc6311e1c7d9a3 Author: Lee Dongjin Date: 2018-08-26T13:19:52Z Remove unused methods from UnsafeArrayData commit 2af13899ab052cc7b52c25b57f154b78a2c45b2a Author: Lee Dongjin Date: 2018-08-26T13:25:25Z Implement UnsafeArrayData#fromBinaryArray commit a8e5c5c0f478a795af1236771236da2074093f3e Author: Lee Dongjin Date: 2018-08-27T12:22:18Z Implement UnsafeArrayData#fromStringArray commit 2ca181046cf1102aed14f4957e11e4dd901ba3c7 Author: Lee Dongjin Date: 2018-08-27T13:28:58Z Implement UnsafeMapData#of commit d0d746d99d0a19ecbb2dc098589adbfd1ef0b5ae Author: Lee Dongjin Date: 2018-08-29T13:25:57Z Allow empty UnsafeArrayData: does not throw IllegalArgumentException on empty or null array argument anymore. commit b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd Author: Lee Dongjin Date: 2018-08-28T07:50:59Z Fix invalid formatting: UnsafeArraySuite commit f077c5d75a83df3541a95a628726e1d74af8c153 Author: Lee Dongjin Date: 2018-08-28T07:51:30Z Implemenet kafka headers functionality commit 6b4d7754d01e4211d05c83746c848fdfd873f229 Author: Lee Dongjin Date: 2018-08-29T09:39:55Z Add KafkaTestUtils#{sendMessage, sendMessages(String, Array[(String, String, Array[(String, String)])], Option[Int])} commit c7fb9819989056da4910e2cfc81af332cd603d41 Author: Lee Dongjin Date: 2018-08-29T13:24:09Z Extend KafkaRelationSuite, KafkaSinkSuite to test headers functionality commit dd2d9390478e4c69b01a7c699e28bfe923ef0db1 Author: Lee Dongjin Date: 2018-08-30T10:50:38Z Minor refinements commit 229aac85442b03736fc850cae2c3b26becaedade Author: Lee Dongjin Date: 2018-08-30T12:21:30Z Specify #selectExpr on KafkaSourceSuiteBase's 'Kafka column types' test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org