[2/2] spark git commit: [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader

2018-01-16 Thread tdas
[SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to 
continuous Kafka data reader

## What changes were proposed in this pull request?

The Kafka reader is now interruptible and can close itself.
## How was this patch tested?

I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. 
Before the fix, my machine ran out of open file descriptors a few iterations 
in; now it works fine.

Author: Jose Torres 

Closes #20253 from jose-torres/fix-data-reader.

(cherry picked from commit 16670578519a7b787b0c63888b7d2873af12d5b9)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a441d2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a441d2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a441d2e

Branch: refs/heads/branch-2.3
Commit: 0a441d2edb0a3f6c6c7c370db8917e1c07f211e7
Parents: 08252bb
Author: Jose Torres 
Authored: Tue Jan 16 18:11:27 2018 -0800
Committer: Tathagata Das 
Committed: Tue Jan 16 18:14:03 2018 -0800

--
 .../sql/kafka010/KafkaContinuousReader.scala| 260 +
 .../sql/kafka010/KafkaContinuousWriter.scala| 119 
 .../spark/sql/kafka010/KafkaOffsetReader.scala  |  21 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala |  17 +-
 .../spark/sql/kafka010/KafkaSourceOffset.scala  |   7 +-
 .../sql/kafka010/KafkaSourceProvider.scala  | 105 +++-
 .../spark/sql/kafka010/KafkaWriteTask.scala |  71 ++-
 .../apache/spark/sql/kafka010/KafkaWriter.scala |   5 +-
 .../sql/kafka010/KafkaContinuousSinkSuite.scala | 476 
 .../kafka010/KafkaContinuousSourceSuite.scala   |  96 
 .../sql/kafka010/KafkaContinuousTest.scala  |  94 
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 539 ++-
 .../org/apache/spark/sql/DataFrameReader.scala  |  32 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  25 +-
 .../datasources/v2/WriteToDataSourceV2.scala|   8 +-
 .../execution/streaming/StreamExecution.scala   |  15 +-
 .../ContinuousDataSourceRDDIter.scala   |   4 +-
 .../continuous/ContinuousExecution.scala|  67 ++-
 .../streaming/continuous/EpochCoordinator.scala |  21 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  26 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  36 +-
 21 files changed, 1628 insertions(+), 416 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a441d2e/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
new file mode 100644
index 000..fc97797
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeoutException
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, 
OffsetOutOfRangeException}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import 

[2/2] spark git commit: [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader

2018-01-16 Thread tdas
[SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to 
continuous Kafka data reader

## What changes were proposed in this pull request?

The Kafka reader is now interruptible and can close itself.
## How was this patch tested?

I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. 
Before the fix, my machine ran out of open file descriptors a few iterations 
in; now it works fine.

Author: Jose Torres 

Closes #20253 from jose-torres/fix-data-reader.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16670578
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16670578
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16670578

Branch: refs/heads/master
Commit: 16670578519a7b787b0c63888b7d2873af12d5b9
Parents: a9b845e
Author: Jose Torres 
Authored: Tue Jan 16 18:11:27 2018 -0800
Committer: Tathagata Das 
Committed: Tue Jan 16 18:11:27 2018 -0800

--
 .../sql/kafka010/KafkaContinuousReader.scala| 260 +
 .../sql/kafka010/KafkaContinuousWriter.scala| 119 
 .../spark/sql/kafka010/KafkaOffsetReader.scala  |  21 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala |  17 +-
 .../spark/sql/kafka010/KafkaSourceOffset.scala  |   7 +-
 .../sql/kafka010/KafkaSourceProvider.scala  | 105 +++-
 .../spark/sql/kafka010/KafkaWriteTask.scala |  71 ++-
 .../apache/spark/sql/kafka010/KafkaWriter.scala |   5 +-
 .../sql/kafka010/KafkaContinuousSinkSuite.scala | 476 
 .../kafka010/KafkaContinuousSourceSuite.scala   |  96 
 .../sql/kafka010/KafkaContinuousTest.scala  |  94 
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 539 ++-
 .../org/apache/spark/sql/DataFrameReader.scala  |  32 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  25 +-
 .../datasources/v2/WriteToDataSourceV2.scala|   8 +-
 .../execution/streaming/StreamExecution.scala   |  15 +-
 .../ContinuousDataSourceRDDIter.scala   |   4 +-
 .../continuous/ContinuousExecution.scala|  67 ++-
 .../streaming/continuous/EpochCoordinator.scala |  21 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  26 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  36 +-
 21 files changed, 1628 insertions(+), 416 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
new file mode 100644
index 000..fc97797
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeoutException
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, 
OffsetOutOfRangeException}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ContinuousReader]] for data from kafka.