[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r215092933
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
   throw new NullPointerException(s"null topic present in the data. Use 
the " +
 s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
 }
-val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
+val record = if (projectedRow.isNullAt(3)) {
+  new ProducerRecord[Array[Byte], Array[Byte]](
+topic.toString,
+null,
+key,
+value
+  )
+} else {
+  val headerMap = projectedRow.getMap(3)
+  val headers = (0 until headerMap.numElements()).toArray.map(
+i =>
+  new RecordHeader(
--- End diff --

Yeah then I also think it is missing spot for Kafka. Just asked it to Kafka 
dev mailing list.

https://lists.apache.org/thread.html/2ec3e7e2345e64ac559d98aaa28e0980f07a9778db447168e19d41d2@%3Cdev.kafka.apache.org%3E

If Kafka community says it's missing spot, either of us can go ahead fixing 
that. You can take it forward if you're happy to do it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-04 Thread dongjinleekr
Github user dongjinleekr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214856258
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
   throw new NullPointerException(s"null topic present in the data. Use 
the " +
 s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
 }
-val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
+val record = if (projectedRow.isNullAt(3)) {
+  new ProducerRecord[Array[Byte], Array[Byte]](
+topic.toString,
+null,
+key,
+value
+  )
+} else {
+  val headerMap = projectedRow.getMap(3)
+  val headers = (0 until headerMap.numElements()).toArray.map(
+i =>
+  new RecordHeader(
--- End diff --

As of September 2018, `RecordHeader` is the only implementation provided by 
Kafka. As you can see 
[here](https://memorynotfound.com/spring-kafka-adding-custom-header-kafka-message-example/),
 this way is widely used - I think it is more natural for `RecordHeader` to be 
hidden by some builder classes but it's not. It seems a missing spot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214618743
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
 ---
@@ -44,6 +44,11 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
   5,
   DateTimeUtils.fromJavaTimestamp(new 
java.sql.Timestamp(record.timestamp)))
 rowWriter.write(6, record.timestampType.id)
+val keys = record.headers.toArray.map(_.key())
--- End diff --

Might be better to define a new local value for `record.headers.toArray`, 
because it creates a new array when `headers` is not empty. It also guarantees 
consistent view for extracting keys and values, though we know `headers` is 
unlikely to be modified during this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214635480
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
   throw new NullPointerException(s"null topic present in the data. Use 
the " +
 s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
 }
-val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
+val record = if (projectedRow.isNullAt(3)) {
+  new ProducerRecord[Array[Byte], Array[Byte]](
--- End diff --

nit: Spark scala style guide stated below:

https://github.com/databricks/scala-style-guide#spacing-and-indentation

> For method and class constructor invocations, use 2 space indentation for 
its parameters and put each in each line when the parameters don't fit in two 
lines.

there're multiple places which these lines can be compacted into one or two 
lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214629265
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
   throw new NullPointerException(s"null topic present in the data. Use 
the " +
 s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
 }
-val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
+val record = if (projectedRow.isNullAt(3)) {
+  new ProducerRecord[Array[Byte], Array[Byte]](
+topic.toString,
+null,
+key,
+value
+  )
+} else {
+  val headerMap = projectedRow.getMap(3)
+  val headers = (0 until headerMap.numElements()).toArray.map(
--- End diff --

We could remove `.toArray` here and also `.toIterable` in 
`headers.toIterable.asJava` unless there's performance difference.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214638140
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -1254,6 +1254,9 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
   .option("startingOffsets", s"earliest")
   .option("subscribe", topic)
   .load()
+  .selectExpr(
--- End diff --

It would be better if we check exhaustive columns here, so adding check for 
headers sounds better to me instead of limiting columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214646749
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -1254,6 +1254,9 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
   .option("startingOffsets", s"earliest")
   .option("subscribe", topic)
   .load()
+  .selectExpr(
--- End diff --

I just indicated your comment that without `selectExpr` the test failed. Is 
it related to the addition of header field? I guess we should make sure it also 
works without `selectExpr` so that we can check also `headers` with memory sink.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214622654
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -293,7 +294,12 @@ private[kafka010] class KafkaSource(
 cr.partition,
 cr.offset,
 DateTimeUtils.fromJavaTimestamp(new 
java.sql.Timestamp(cr.timestamp)),
-cr.timestampType.id)
+cr.timestampType.id,
+UnsafeMapData.of(
+  
UnsafeArrayData.fromStringArray(cr.headers().toArray.map(_.key())),
--- End diff --

Same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214622600
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -115,7 +116,12 @@ private[kafka010] class KafkaRelation(
 cr.partition,
 cr.offset,
 DateTimeUtils.fromJavaTimestamp(new 
java.sql.Timestamp(cr.timestamp)),
-cr.timestampType.id)
+cr.timestampType.id,
+UnsafeMapData.of(
+  
UnsafeArrayData.fromStringArray(cr.headers().toArray.map(_.key())),
--- End diff --

Same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214640616
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 ---
@@ -59,14 +59,23 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext with KafkaTest {
 val topic = newTopic()
 testUtils.createTopic(topic)
 val df = Seq("1", "2", "3", "4", "5").map(v => (topic, 
v)).toDF("topic", "value")
+  .withColumn("headers",
+map(lit("x"), 
col("value").plus(1).cast(IntegerType).cast(StringType).cast(BinaryType),
+  lit("y"), 
col("value").multiply(2).cast(IntegerType).cast(StringType).cast(BinaryType)))
 df.write
   .format("kafka")
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
   .option("topic", topic)
   .save()
 checkAnswer(
-  createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
-  Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
+  createKafkaReader(topic).selectExpr(
+"CAST(value as STRING) value",
+"CAST(headers.x AS STRING)",
+"CAST(headers.y AS STRING)"
+  ),
+  Row("1", "2", "2") :: Row("2", "3", "4") :: Row("3", "4", "6") :: 
Row("4", "5", "8"
--- End diff --

nit: `) ::` could be added here instead of next line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214639674
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -136,6 +142,19 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
 checkAnswer(df, (0 to 30).map(_.toString).toDF)
   }
 
+  test("default starting and ending offsets with headers") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessage(topic, (null, "1", Array(("once", "1"), 
("twice", "2"))), Some(0))
+testUtils.sendMessage(topic, (null, "2", Array(("once", "2"), 
("twice", "4"))), Some(1))
+testUtils.sendMessage(topic, (null, "3", Array(("once", "3"), 
("twice", "6"))), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = createDF(topic, Map.empty[String, String], None, true)
--- End diff --

nit: explicitly assigning `includeHeaders = true` may look easier to see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-09-03 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214632570
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
   throw new NullPointerException(s"null topic present in the data. Use 
the " +
 s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
 }
-val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
+val record = if (projectedRow.isNullAt(3)) {
+  new ProducerRecord[Array[Byte], Array[Byte]](
+topic.toString,
+null,
+key,
+value
+  )
+} else {
+  val headerMap = projectedRow.getMap(3)
+  val headers = (0 until headerMap.numElements()).toArray.map(
+i =>
+  new RecordHeader(
--- End diff --

Looks like RecordHeader is in `org.apache.kafka.common.header.internals` 
package. Is there any alternative public methods/classes to create Header? Or 
it is just a missing spot in Kafka?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-31 Thread dongjinleekr
Github user dongjinleekr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214345393
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

Thank you for your kind guidance. I drop the commit removing some methods - 
it was totally wrong! :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-31 Thread dongjinleekr
Github user dongjinleekr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214345173
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
 throw new 
IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
   s"attribute unsupported type ${t.catalogString}")
 }
+val headersExpression = inputSchema
+  .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+  Literal(CatalystTypeConverters.convertToCatalyst(null), 
MapType(StringType, BinaryType))
+)
+headersExpression.dataType match {
+  case MapType(StringType, BinaryType, true) => // good
+  case t =>
+throw new 
IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
--- End diff --

Oh, I misunderstood; After reviewing the code, I found that 
`KafkaRowWriter#createProjection` throws `IllegalStateException` while 
`KafkaWriter#validateQuery` throwing `AnalysisException.` I think the reason 
should be attributed to the difference between two methods - while the former 
one detects the error from the state of `InternalRow,` the later one does by 
analyzing the expression's schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread dongjinleekr
Github user dongjinleekr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214198620
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
 throw new 
IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
   s"attribute unsupported type ${t.catalogString}")
 }
+val headersExpression = inputSchema
+  .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+  Literal(CatalystTypeConverters.convertToCatalyst(null), 
MapType(StringType, BinaryType))
+)
+headersExpression.dataType match {
+  case MapType(StringType, BinaryType, true) => // good
+  case t =>
+throw new 
IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
--- End diff --

Just a typo.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214084903
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

Yep, the UT failed log proved 
this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214073971
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
 throw new 
IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
   s"attribute unsupported type ${t.catalogString}")
 }
+val headersExpression = inputSchema
+  .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+  Literal(CatalystTypeConverters.convertToCatalyst(null), 
MapType(StringType, BinaryType))
+)
+headersExpression.dataType match {
+  case MapType(StringType, BinaryType, true) => // good
+  case t =>
+throw new 
IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
--- End diff --

This exception is different from the AnalysisException thrown in the next 
class.
What's the reason ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214075761
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

I think `shouldUseGenericArrayData` is still used in generated code, check 
the code here:

https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread dongjinleekr
GitHub user dongjinleekr opened a pull request:

https://github.com/apache/spark/pull/22282

[SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

## What changes were proposed in this pull request?

This update adds support for Kafka Headers functionality in Structured 
Streaming.

## How was this patch tested?

With following unit tests:

- KafkaRelationSuite: "default starting and ending offsets with headers" 
(new)
- KafkaSinkSuite: "batch - write to kafka" (updated)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjinleekr/spark feature/SPARK-23539

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22282


commit ddd08612ef8bdb173f974059d2dc6311e1c7d9a3
Author: Lee Dongjin 
Date:   2018-08-26T13:19:52Z

Remove unused methods from UnsafeArrayData

commit 2af13899ab052cc7b52c25b57f154b78a2c45b2a
Author: Lee Dongjin 
Date:   2018-08-26T13:25:25Z

Implement UnsafeArrayData#fromBinaryArray

commit a8e5c5c0f478a795af1236771236da2074093f3e
Author: Lee Dongjin 
Date:   2018-08-27T12:22:18Z

Implement UnsafeArrayData#fromStringArray

commit 2ca181046cf1102aed14f4957e11e4dd901ba3c7
Author: Lee Dongjin 
Date:   2018-08-27T13:28:58Z

Implement UnsafeMapData#of

commit d0d746d99d0a19ecbb2dc098589adbfd1ef0b5ae
Author: Lee Dongjin 
Date:   2018-08-29T13:25:57Z

Allow empty UnsafeArrayData: does not throw IllegalArgumentException on 
empty or null array argument anymore.

commit b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd
Author: Lee Dongjin 
Date:   2018-08-28T07:50:59Z

Fix invalid formatting: UnsafeArraySuite

commit f077c5d75a83df3541a95a628726e1d74af8c153
Author: Lee Dongjin 
Date:   2018-08-28T07:51:30Z

Implemenet kafka headers functionality

commit 6b4d7754d01e4211d05c83746c848fdfd873f229
Author: Lee Dongjin 
Date:   2018-08-29T09:39:55Z

Add KafkaTestUtils#{sendMessage, sendMessages(String, Array[(String, 
String, Array[(String, String)])], Option[Int])}

commit c7fb9819989056da4910e2cfc81af332cd603d41
Author: Lee Dongjin 
Date:   2018-08-29T13:24:09Z

Extend KafkaRelationSuite, KafkaSinkSuite to test headers functionality

commit dd2d9390478e4c69b01a7c699e28bfe923ef0db1
Author: Lee Dongjin 
Date:   2018-08-30T10:50:38Z

Minor refinements

commit 229aac85442b03736fc850cae2c3b26becaedade
Author: Lee Dongjin 
Date:   2018-08-30T12:21:30Z

Specify #selectExpr on KafkaSourceSuiteBase's 'Kafka column types' test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org