[2/2] spark git commit: [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader
[SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader ## What changes were proposed in this pull request? The Kafka reader is now interruptible and can close itself. ## How was this patch tested? I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. Before the fix, my machine ran out of open file descriptors a few iterations in; now it works fine. Author: Jose TorresCloses #20253 from jose-torres/fix-data-reader. (cherry picked from commit 16670578519a7b787b0c63888b7d2873af12d5b9) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a441d2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a441d2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a441d2e Branch: refs/heads/branch-2.3 Commit: 0a441d2edb0a3f6c6c7c370db8917e1c07f211e7 Parents: 08252bb Author: Jose Torres Authored: Tue Jan 16 18:11:27 2018 -0800 Committer: Tathagata Das Committed: Tue Jan 16 18:14:03 2018 -0800 -- .../sql/kafka010/KafkaContinuousReader.scala| 260 + .../sql/kafka010/KafkaContinuousWriter.scala| 119 .../spark/sql/kafka010/KafkaOffsetReader.scala | 21 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 17 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 7 +- .../sql/kafka010/KafkaSourceProvider.scala | 105 +++- .../spark/sql/kafka010/KafkaWriteTask.scala | 71 ++- .../apache/spark/sql/kafka010/KafkaWriter.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 476 .../kafka010/KafkaContinuousSourceSuite.scala | 96 .../sql/kafka010/KafkaContinuousTest.scala | 94 .../spark/sql/kafka010/KafkaSourceSuite.scala | 539 ++- .../org/apache/spark/sql/DataFrameReader.scala | 32 +- .../org/apache/spark/sql/DataFrameWriter.scala | 25 +- .../datasources/v2/WriteToDataSourceV2.scala| 8 +- .../execution/streaming/StreamExecution.scala | 15 +- .../ContinuousDataSourceRDDIter.scala | 4 +- .../continuous/ContinuousExecution.scala| 67 ++- .../streaming/continuous/EpochCoordinator.scala | 21 +- .../spark/sql/streaming/DataStreamWriter.scala | 26 +- .../apache/spark/sql/streaming/StreamTest.scala | 36 +- 21 files changed, 1628 insertions(+), 416 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a441d2e/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala new file mode 100644 index 000..fc97797 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeoutException + +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import
[2/2] spark git commit: [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader
[SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader ## What changes were proposed in this pull request? The Kafka reader is now interruptible and can close itself. ## How was this patch tested? I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. Before the fix, my machine ran out of open file descriptors a few iterations in; now it works fine. Author: Jose TorresCloses #20253 from jose-torres/fix-data-reader. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16670578 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16670578 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16670578 Branch: refs/heads/master Commit: 16670578519a7b787b0c63888b7d2873af12d5b9 Parents: a9b845e Author: Jose Torres Authored: Tue Jan 16 18:11:27 2018 -0800 Committer: Tathagata Das Committed: Tue Jan 16 18:11:27 2018 -0800 -- .../sql/kafka010/KafkaContinuousReader.scala| 260 + .../sql/kafka010/KafkaContinuousWriter.scala| 119 .../spark/sql/kafka010/KafkaOffsetReader.scala | 21 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 17 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 7 +- .../sql/kafka010/KafkaSourceProvider.scala | 105 +++- .../spark/sql/kafka010/KafkaWriteTask.scala | 71 ++- .../apache/spark/sql/kafka010/KafkaWriter.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 476 .../kafka010/KafkaContinuousSourceSuite.scala | 96 .../sql/kafka010/KafkaContinuousTest.scala | 94 .../spark/sql/kafka010/KafkaSourceSuite.scala | 539 ++- .../org/apache/spark/sql/DataFrameReader.scala | 32 +- .../org/apache/spark/sql/DataFrameWriter.scala | 25 +- .../datasources/v2/WriteToDataSourceV2.scala| 8 +- .../execution/streaming/StreamExecution.scala | 15 +- .../ContinuousDataSourceRDDIter.scala | 4 +- .../continuous/ContinuousExecution.scala| 67 ++- .../streaming/continuous/EpochCoordinator.scala | 21 +- .../spark/sql/streaming/DataStreamWriter.scala | 26 +- .../apache/spark/sql/streaming/StreamTest.scala | 36 +- 21 files changed, 1628 insertions(+), 416 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala new file mode 100644 index 000..fc97797 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeoutException + +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ContinuousReader]] for data from kafka.