[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239563967 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- "Exist" is a relative concept, I suppose. I think we need to somehow allow for create-on-write functionality, even if many table providers won't want to support it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user jose-torres closed the pull request at: https://github.com/apache/spark/pull/20906 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...
Github user jose-torres closed the pull request at: https://github.com/apache/spark/pull/20752 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user jose-torres closed the pull request at: https://github.com/apache/spark/pull/20859 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/23095#discussion_r235473858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -117,6 +117,7 @@ class ContinuousExecution( // For at least once, we can just ignore those reports and risk duplicates. commitLog.getLatest() match { case Some((latestEpochId, _)) => +updateStatusMessage(s"Getting offsets from latest epoch $latestEpochId") --- End diff -- nit: I'd mention that we're restarting the streaming query from an existing epoch in this message, to contrast with "starting new query" in the other branch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r235470285 --- Diff: project/MimaExcludes.scala --- @@ -149,7 +149,8 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.DataSourceWriter"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"), --- End diff -- This list of exclusions is getting kinda silly. Is there some way to just completely exclude this package from compatibility checks until we've stabilized it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaCon...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/23023 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/22547 I agree that we need a shared understanding of the relationship between this work and the new catalog API. I was not under the impression that the primary purpose of v2 is to integrate catalog tables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22671: [SPARK-25615][SQL][TEST] Improve the test runtime...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22671#discussion_r223775935 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala --- @@ -332,7 +332,9 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest { var ex: Exception = null try { ex = intercept[StreamingQueryException] { -writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))() +writer = createKafkaWriter(input.toDF(), --- End diff -- In retrospect 1000 may have been too low - that could cause flakiness if the test runner is slow or something and it legitimately takes a second or two for Kafka to respond. 10k definitely should work, and I'd be fine with 1k if we're really worried about test time here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22478: [SPARK-25472][SS] Don't have legitimate stops of streams...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/22478 Lgtm pending tests On Wed, Sep 19, 2018 at 5:16 PM Shixiong Zhu wrote: > LGTM pending tests. Could you add [SS] to your title? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22478#issuecomment-422998495>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AdL70zfdGIhkC4wnrqitXNknF_weFv5Yks5uct53gaJpZM4WxJvK> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22388: Revert [SPARK-24882][SQL] improve data source v2 API fro...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/22388 MicroBatchExecution.scala and ContinuousExecution.scala look right after the revert, although it would be helpful to understand what the diff is between this and a straight `git revert`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22386#discussion_r216728848 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -391,6 +393,7 @@ class ContinuousExecution( } object ContinuousExecution { + val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" --- End diff -- nit: I think this belongs in StreamExecution, since both ContinuousExecution and MicroBatchExecution set it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22245: [SPARK-24882][FOLLOWUP] Fix flaky synchronization...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/22245 [SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests. ## What changes were proposed in this pull request? Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration. We caught most instances of this in the original PR, but this one slipped through. ## How was this patch tested? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark fixflake Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22245.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22245 commit 93c7bd93f5dbec41a0fd4d6b5ef0bfe0bfdc235c Author: Jose Torres Date: 2018-08-27T17:03:17Z fix flake --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22191: [SPARK-25204][SS] Fix race in rate source test.
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/22191 [SPARK-25204][SS] Fix race in rate source test. ## What changes were proposed in this pull request? Fix a race in the rate source tests. We need a better way of testing restart behavior. ## How was this patch tested? unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark racetest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22191 commit eec0ad08e390831717203f6d002e3b1218de6d36 Author: Jose Torres Date: 2018-08-22T22:10:32Z fix test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r211739221 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala --- @@ -47,7 +47,9 @@ trait KafkaContinuousTest extends KafkaSourceTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r + case r: StreamingDataSourceV2Relation + if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig] --- End diff -- I think this logic is subtly incorrect (and what's causing the flakiness in the continuous test). It needs to get the actual scan config being used from DataSourceV2ScanExec in the physical plan; r.scanConfigBuilder.build() will always produce the most up-to-date `knownPartitions` value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r211639298 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -24,16 +24,17 @@ import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; /** - * An interface that defines how to scan the data from data source for continuous streaming + * An interface that defines how to load the data from data source for continuous streaming * processing. * - * The execution engine will create an instance of this interface at the start of a streaming query, - * then call {@link #newScanConfigBuilder(Offset)} and create an instance of {@link ScanConfig} for - * the duration of the streaming query or until {@link #needsReconfiguration(ScanConfig)} is true. - * The {@link ScanConfig} will be used to create input partitions and reader factory to process data - * for its duration. At the end {@link #stop()} will be called when the streaming execution is - * completed. Note that a single query may have multiple executions due to restart or failure - * recovery. + * The execution engine will get an instance of this interface from a data source provider + * (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a + * streaming query, then call {@link #newScanConfigBuilder(Offset)} to create an instance of + * {@link ScanConfig} for the duration of the streaming query or until + * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create + * input partitions and reader factory to scan data for its duration. At the end {@link #stop()} + * will be called when the streaming execution is completed. Note that a single query may have + * multiple executions due to restart or failure recovery. --- End diff -- I would also add this documentation on the relevant methods. So getContinuousReadSupport and getMicroBatchReadSupport would say something like "Spark will call this method at the beginning of each streaming query to get a ReadSupport", newScanConfigBuilder would say something like "Spark will get a ScanConfig once for each data scanning job". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 Sure, but I'm not a committer so I can't make that happen. @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/22009 There's a reasonable chance that the Error adding data: Could not find index of the source to which data was added flakiness in the Kafka suite was caused by this PR. Let me know if you need help debugging. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209708483 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java --- @@ -21,33 +21,39 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; import org.apache.spark.sql.types.StructType; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability and save the data to the data source. + * provide data writing ability for batch processing. + * + * This interface is used when end users want to use a data source implementation directly, e.g. + * {@code Dataset.write.format(...).option(...).save()}. */ @InterfaceStability.Evolving -public interface WriteSupport extends DataSourceV2 { +public interface BatchWriteSupportProvider extends DataSourceV2 { /** - * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data + * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data * sources can return None if there is no writing needed to be done according to the save mode. * * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. * - * @param writeUUID A unique string for the writing job. It's possible that there are many writing - * jobs running at the same time, and the returned {@link DataSourceWriter} can - * use this job id to distinguish itself from other jobs. + * @param queryId A unique string for the writing query. It's possible that there are many + *writing queries running at the same time, and the returned + *{@link BatchWriteSupport} can use this id to distinguish itself from others. * @param schema the schema of the data to be written. * @param mode the save mode which determines what to do when the data are already in this data * source, please refer to {@link SaveMode} for more details. * @param options the options for the returned data source writer, which is an immutable *case-insensitive string-to-string map. - * @return a writer to append data to this data source + * @return a write support to write data to this data source. */ - Optional createWriter( - String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options); + Optional createBatchWriteSupport( + String queryId, + StructType schema, + SaveMode mode, --- End diff -- To clarify, your proposal is that we should block the completion of DataSourceV2 until the new logical plans are in place? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209702134 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); --- End diff -- There are two consumer groups in streaming: 1. The one at the driver, which determines what offsets are available to scan. 2. The one distributed across the executors which actually performs the scan. This method is used to commit certain offsets in group 1, based on the offsets which have been logged as processed by group 2. In microbatch mode, this happens to work with ScanConfig, because there is one ScanConfig for each offset log entry. In continuous mode there is one ScanConfig corresponding to an indefinite number of offset log entries, so ScanConfig does not provide the information required to commit any particular entry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209301920 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- Sorry, to be clear, this method was in the original streaming design doc https://docs.google.com/document/d/1VzxEuvpLfuHKL6vJO9qJ6ug0x9J_gLoLSH_vJL3-Cho which was sent out 2 months ago. If the reworking in this PR has made you realize there's a better way to do things, I think we should absolutely consider the alternative. But we can't just remove random methods without a complete proposal for what should replace them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209094853 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); --- End diff -- I'm not sure what you mean by "scan state" here. The thing that is scanned needs to know what offsets are available for scanning, which requires holding a Kafka consumer to read that information. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209059908 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); --- End diff -- Not in the current implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209041048 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * micro-batch mode. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end); + + /** + * Returns the most recent offset available. + */ + Offset latestOffset(Offset start); --- End diff -- (Note that this is not just the Kafka reader - file streams have a similar option.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209040246 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * micro-batch mode. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end); + + /** + * Returns the most recent offset available. + */ + Offset latestOffset(Offset start); --- End diff -- I agree in principle, but I don't know of any way to special case it without adding it to the API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208983568 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for continuous stream processing. + * + * This interface is used when end users want to use a data source implementation directly, e.g. + * {@code SparkSession.readStream.format(...).option(...).load()}. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupportProvider extends DataSourceV2 { + + /** + * Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. --- End diff -- nit: This comment has been carried over to a lot of different places as we evolve the API. It's still true but I don't think it's really applicable here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208984614 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java --- @@ -23,8 +23,9 @@ * The base interface for data source v2. Implementations must have a public, 0-arg constructor. * * Note that this is an empty interface. Data source implementations should mix-in at least one of - * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just - * a dummy data source which is un-readable/writable. + * the provider interfaces like {@link BatchReadSupportProvider} or --- End diff -- nit: I would say something more like "Data source implementations must mix in interfaces such as {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider} to provide read or write functionality. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208763503 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java --- @@ -27,10 +27,10 @@ @InterfaceStability.Evolving public interface SessionConfigSupport extends DataSourceV2 { -/** - * Key prefix of the session configs to propagate. Spark will extract all session configs that - * starts with `spark.datasource.$keyPrefix`, turn `spark.datasource.$keyPrefix.xxx -> yyy` - * into `xxx -> yyy`, and propagate them to all data source operations in this session. - */ -String keyPrefix(); + /** + * Key prefix of the session configs to propagate. Spark will extract all session configs that + * starts with `spark.datasource.$keyPrefix`, turn `spark.datasource.$keyPrefix.xxx -> yyy` --- End diff -- Is `datasource` a placeholder here, or would Kafka configs now be e.g. `spark.datasource.kafka.[...]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208984707 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for micro-batch stream processing. + * + * This interface is used when end users want to use a data source implementation directly, e.g. + * {@code SparkSession.readStream.format(...).option(...).load()}. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupportProvider extends DataSourceV2 { + + /** + * Creates a {@link MicroBatchReadSupport} to scan the data from this streaming data source. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. --- End diff -- nit: same comment as in continuous class --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209019530 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); --- End diff -- I'd +1 passing a ScanConfig. I agree that all the existing sources are just going to pull out the offset, but "Spark is finished with this scan" is a cleaner semantic than "Spark is finished with the scan such that it goes up to this offset". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208986233 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface that defines how to scan the data from data source for batch processing. --- End diff -- I think the class docs here should include a high-level overview of the path from here to a Spark job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208987767 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java --- @@ -22,18 +22,16 @@ import org.apache.spark.annotation.InterfaceStability; /** - * An input partition returned by {@link DataSourceReader#planInputPartitions()} and is - * responsible for creating the actual data reader of one RDD partition. - * The relationship between {@link InputPartition} and {@link InputPartitionReader} - * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. + * An input partition returned by {@link ReadSupport#planInputPartitions(ScanConfig)}, which + * represents a data split that should be processed by one Spark task. --- End diff -- I'm not sure we need to talk about "data split" - I don't think people will try to implement data sources without knowing what a partition is in Spark. I'd suggest saying "A serializable representation of an input partition...", to make it clear that this should just contain metadata required to identify what the partition is and not the actual data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209013149 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * A factory used to create {@link PartitionReader} instances. + */ +@InterfaceStability.Evolving +public interface PartitionReaderFactory extends Serializable { + + /** + * Returns a row-based partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + PartitionReader createReader(InputPartition partition); + + /** + * Returns a columnar partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + default PartitionReader createColumnarReader(InputPartition partition) { +throw new UnsupportedOperationException("Cannot create columnar reader."); + } + + /** + * Returns true if the given {@link InputPartition} should be read by Spark in a columnar way. + * This means, implementations must also implement {@link #createColumnarReader(InputPartition)} + * for the input partitions that this method returns true. + * + * As of Spark 2.4, Spark can only read all input partition in a columnar way, or none of them. + * Data source can't mix columnar and row-based partitions. This will be relaxed in future --- End diff -- nit: may be relaxed, we shouldn't guarantee it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209018297 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface that defines how to scan the data from data source for continuous streaming + * processing. + * + * The execution engine will create an instance of this interface at the start of a streaming query, + * then call {@link #newScanConfigBuilder(Offset)} and create an instance of {@link ScanConfig} for + * the duration of the streaming query or until {@link #needsReconfiguration(ScanConfig)} is true. + * The {@link ScanConfig} will be used to create input partitions and reader factory to process data + * for its duration. At the end {@link #stop()} will be called when the streaming execution is + * completed. Note that a single query may have multiple executions due to restart or failure + * recovery. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * to do operators pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} --- End diff -- No, I think we just need a new `ScanConfig`. (But this PR is already very large and that will require execution layer changes, so I'd suggest filing a followup for that.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208982830 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java --- @@ -18,19 +18,22 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.DataSourceRegister; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.sources.v2.reader.BatchReadSupport; import org.apache.spark.sql.types.StructType; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability and scan the data from the data source. + * provide data reading ability for batch processing. + * + * This interface is used when end users want to use a data source implementation directly, e.g. + * {@code SparkSession.read.format(...).option(...).load()}. */ @InterfaceStability.Evolving -public interface ReadSupport extends DataSourceV2 { +public interface BatchReadSupportProvider extends DataSourceV2 { /** - * Creates a {@link DataSourceReader} to scan the data from this data source. + * Creates a {@link BatchReadSupport} to scan the data from this data source. --- End diff -- nit: ... from this data source with a user specified schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209021329 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer.streaming; + +import java.io.Serializable; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.writer.DataWriter; + +/** + * A factory of {@link DataWriter} returned by + * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating + * and initializing the actual data writer at executor side. + * + * Note that, the writer factory will be serialized and sent to executors, then the data writer + * will be created on executors and do the actual writing. So this interface must be + * serializable and {@link DataWriter} doesn't need to be. + */ +@InterfaceStability.Evolving +public interface StreamingDataWriterFactory extends Serializable { + + /** + * Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data + * object instance when sending data to the data writer, for better performance. Data writers + * are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a + * list. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + * + * @param partitionId A unique id of the RDD partition that the returned writer will process. + *Usually Spark processes many RDD partitions at the same time, + *implementations should use the partition id to distinguish writers for + *different partitions. + * @param taskId A unique identifier for a task that is performing the write of the partition + * data. Spark may run multiple tasks for the same partition (due to speculation + * or task failures, for example). --- End diff -- Is it the ID of the task or the ID of one particular attempt of the task? (The target audience here is people who know a reasonable amount about Spark - I think we should just say TaskContext.taskAttemptId() if that's what this is.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 No more suggestions, the PR looks fine to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208642760 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * micro-batch mode. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end); + + /** + * Returns the most recent offset available. + */ + Offset latestOffset(Offset start); --- End diff -- There's a weak form of rate control implemented by simply having sources lie about what the latest offset is. For example you might set maxOffsetsPerTrigger = 100, and then the Kafka source will pretend that only 100 more offsets exist even if there are really more available. Unfortunately, we're going to need to continue to support such options at least until the next major version after we have better rate limiting, so I don't think this can be removed from the source API right now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208641014 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- As I said, I'm fine with defining arbitrary JSON strings as the single non-customizable offset type, if you think that would be better. (I think they would have to be strings, because making a JSON object the type would mean packaging some JSON library into the API.) I don't think it would ever be correct to have an Offset class which doesn't trivially reduce to a key-value map. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208425737 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- I guess my core point is, we should stick with the existing serialization mechanism unless there's some kind of serialization we need to do which only a byte array can express. The serialization mechanism reaches deep into the execution layer, so coupling it with a connector API revamp is awkward. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208425199 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- I think I understand what you're saying. I could get behind a proposal to simply define "arbitrary JSON string" as the one and only offset type, with each connector responsible for writing and parsing JSON however it'd like. All the existing offsets are trivial case classes anyway; it'd be a bit of a migration, but nothing architecturally difficult to handle. I don't see how a `toBytes` method would help the problem. Neither arbitrary byte arrays nor arbitrary JSON strings let Spark know what type it's supposed to instantiate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208392865 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- Currently, there are two representations of any given offset: a connector-defined JVM object and a serialized JSON string. Spark can't build the JVM object itself because it doesn't know what the right type is. If you know of some clean way for a connector to declare "here is the type of my offsets", we should do that instead, but I only know how to do it through reflection magic more confusing than the status quo. I'd hesitate to introduce a third representation unless there's some concrete use case where JSON serialization won't work well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208391449 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} + * instance. + */ + default boolean needsReconfiguration() { --- End diff -- Makes sense to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208373424 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the --- End diff -- Streaming-centric sources won't always have the initial offset be the oldest offset. In the Kafka source, for instance, the default is actually to start from the newest offset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208372469 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- The offsets are ultimately exposed as JSON inside the JSON representation of StreamingQueryProgress. It's important for visibility and debuggability that progress events contain human-readable representations of offsets. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208370493 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} + * instance. + */ + default boolean needsReconfiguration() { --- End diff -- The motivation for this method is things like Kafka source repartitioning. If a topic gets partitions added to it (or a subscription pattern gets topics added to it), Spark needs to schedule a new job which will scan the new partitions/topics, even though the Spark-side scan hasn't changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r207978414 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -122,24 +119,22 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" - override def setOffsetRange(start: Optional[OffsetV2], end: Optional[OffsetV2]): Unit = { -synchronized { - startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset] - endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset] -} - } - override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong) - override def getStartOffset: OffsetV2 = synchronized { -if (startOffset.offset == -1) null else startOffset + override def initialOffset: OffsetV2 = LongOffset(-1) + + override def latestOffset(start: OffsetV2): OffsetV2 = { +if (currentOffset.offset == -1) null else currentOffset --- End diff -- Yes, I agree. The V1 API didn't require sources to implement a "this is the beginning of the stream, read everything" offset, but that was a mistake we should make sure to remedy here. A followup PR makes sense, because there's some stream execution logic that can be greatly simplified when all sources have a real initial offset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r207961663 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -179,3 +192,24 @@ class InternalRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncod override def abort(): Unit = rowWriter.abort() } + +/** + * Collects commit progress on writers. +*/ +trait StreamWriterProgressCollector { --- End diff -- This is a kinda weird interface. Can't we just create a StreamWriterCommitProgress(numOutputRows) instead of round-tripping through here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 If the individual connectors aren't doing the counting, I don't see a good reason to put the data inside WriterCommitMessage instead of just leaving StreamWriterCommitProgress as its own separate interface. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 I don't think so. The offsets for the file source need to be consumer owned, because they need to work with files that were generated outside of Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 For file streams, the offsets are just indices into a log the source keeps of which files it's seen. So a file sink doesn't have any access to those offsets. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 Minimum and maximum offset in the sink wouldn't make sense for most sources. There aren't any meaningful values to report for e.g. writing out Parquet files. It'd make sense to put them inside just the Kafka WriterCommitMessage, but then I don't think that requires API support. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21919 I like the idea of doing this, but I don't think it really belongs as part of the WriterCommitMessage interface. Every connector shouldn't have to independently count its rows; the execution framework should do the counting automatically, and send an independent StreamWriterCommitProgress to the driver along with each WriterCommitMessage. Note that we'll probablywant to extend StreamWriterCommitProgress soon to carry metrics for continuous processing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21199 The change looks broadly good (and important) to me. I'll defer to @HeartSaVioR wrt the in-depth review; let me know if there are any specific parts I should to take a look at. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21948 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21946 Wouldn't the redo of the API that we're discussing obsolete this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206325719 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +/** + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to report custom metrics that gets reported under the + * {@link org.apache.spark.sql.streaming.SourceProgress} + * + */ +@InterfaceStability.Evolving +public interface SupportsCustomReaderMetrics extends DataSourceReader { +/** + * Returns custom metrics specific to this data source. + */ +CustomMetrics getCustomMetrics(); + +/** + * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid --- End diff -- Oh wait, this is the same thing we talked about in the initial round of review. I think "throw an error when developing the connector so you can make sure your metrics work right" would still be a good example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206322694 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +/** + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to report custom metrics that gets reported under the + * {@link org.apache.spark.sql.streaming.SourceProgress} + * + */ +@InterfaceStability.Evolving +public interface SupportsCustomReaderMetrics extends DataSourceReader { +/** + * Returns custom metrics specific to this data source. + */ +CustomMetrics getCustomMetrics(); + +/** + * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid --- End diff -- I thought this was a bit convoluted at first, but on reflection I can understand why this additional flexibility is valuable. I think it'd be worth writing an example here of what a source might want to do other than ignore the invalid metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21921 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21118#discussion_r204108549 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy { val filterCondition = postScanFilters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) - val withProjection = if (withFilter.output != project) { -ProjectExec(project, withFilter) - } else { -withFilter - } - - withProjection :: Nil + // always add the projection, which will produce unsafe rows required by some operators + ProjectExec(project, withFilter) :: Nil case r: StreamingDataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil + // ensure there is a projection, which will produce unsafe rows required by some operators + ProjectExec(r.output, --- End diff -- Continuous processing will still be experimental in the 2.4 release, so I'm not tremendously concerned about this. We should eventually change the scan to produce rows in whatever way is most efficient in the final API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21817: [SPARK-24861][SS][test] create corrected temp directorie...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21817 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21733 We could still save the value of the option to offsetSeqMetadata and error if it's changed. The value of using an option would just be that there's no global default; a poweruser can set the option for the queries they think would benefit without affecting all the other queries which get run. I agree it would be nice to just have some safe path allowing us to always use the new strategy. Absent that, there's an unfortunate tradeoff of reduced memory footprint vs added complexity. I think we ultimately need a committer to decide whether that's worth it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201792030 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -239,8 +241,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var valueSchema: StructType = _ @volatile private var storeConf: StateStoreConf = _ @volatile private var hadoopConf: Configuration = _ + @volatile private var numberOfVersionsToRetainInMemory: Int = _ - private lazy val loadedMaps = new mutable.HashMap[Long, MapType] + private lazy val loadedMaps = new util.TreeMap[Long, MapType](Ordering[Long].reverse) --- End diff -- Yeah, I was wondering about that. Makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201793040 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -64,6 +64,63 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } + test("retaining only latest configured size of versions in memory") { --- End diff -- Sorry I didn't catch this earlier. We should ideally have tests that directly validate the specific behaviors we're documenting in the conf: * '2' will read from cache in the direct failure case * '1' will read from cache in the happy path but not if there's a failure * '0' will never populate or read from the cache --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r201791805 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -825,6 +825,16 @@ object SQLConf { .intConf .createWithDefault(100) + val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION = --- End diff -- Can this be a query option instead of a SparkConf, then? I worry it will be very hard to reason about the current scenario, where the conf defines how all states are stored - except that some streams started with a different value will silently override it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21469 Sure, I don't mind if we remove that metric. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21622 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201407516 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming.state; + +import java.util.Comparator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. + * + * As TreeMap does, this implementation sorts elements in natural order, and cuts off + * smaller elements to retain at most bigger N elements. + * + * You can provide reversed order of comparator to retain smaller elements instead. + * + * This class is not thread-safe, so synchronization would be needed to use this concurrently. + * + * @param key type + * @param value type + */ +public final class BoundedSortedMap extends TreeMap { --- End diff -- This is a very general (and complicated) interface to use in only one place. Can we just have the state store handle its cleanup logic directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21721 Looks fine to me with a MemorySink example. I don't think a formal discussion is super necessary - the major advantage of the mixin model is to let us add things like this without impacting the broader API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r201402523 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- I'd suggest handling the custom metrics for Kafka outside the scope of this PR. Maybe we should have a default maxOffsets, but given that we don't I'm worried about adding a metric that's misleading in the default case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r201397799 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -825,6 +825,16 @@ object SQLConf { .intConf .createWithDefault(100) + val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION = --- End diff -- I get worried when I see things described as "advanced features". What will go wrong if a user who's insufficiently advanced tries to use it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21305 (The last test failure is a known flaky test I've been working (albeit unsuccessfully so far) to find a solution for.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200538008 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- Is "lag" here just the difference (at the time a batch ends) between the last offset Spark knows about and the last offset Spark has processed? I'm not sure this is super useful to know. If maxOffsets isn't set it's always going to be 0, no matter how far Spark gets behind the Kafka cluster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537533 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -379,3 +384,16 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( } } } + +// Currently reports per topic-partition lag. --- End diff -- nit: javadoc style for top level comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537276 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsCustomMetrics.java --- @@ -0,0 +1,30 @@ +/* --- End diff -- This should probably be in v2/reader/streaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -178,12 +180,18 @@ class SourceProgress protected[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } -("description" -> JString(description)) ~ +val jsonVal = ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + +if (customMetrics != null) { + jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json())) --- End diff -- Is there any way to get an error to the user if their custom metrics fail to parse? I'm not entirely sure that's the right thing to do, but I worry that it'll be hard to develop against this API if we just silently drop malformed metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21662#discussion_r200454033 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -805,6 +806,75 @@ class StreamSuite extends StreamTest { } } + test("streaming limit without state") { --- End diff -- resolved --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21662#discussion_r200454145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -354,6 +355,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * Used to plan the streaming global limit operator. --- End diff -- As discussed offline, comment on why we need ReturnAnswer handling --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21662#discussion_r199288285 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.streaming.state.StateStoreOps +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType} +import org.apache.spark.util.CompletionIterator + +/** + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit + * rows are returned. + */ +case class StreamingLimitExec( +streamLimit: Long, +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None) + extends UnaryExecNode with StateStoreWriter { + + private val keySchema = StructType(Array(StructField("key", NullType))) + private val valueSchema = StructType(Array(StructField("value", LongType))) + + override protected def doExecute(): RDD[InternalRow] = { +metrics // force lazy init at driver + +child.execute().mapPartitionsWithStateStore( + getStateInfo, + keySchema, + valueSchema, + indexOrdinal = None, + sqlContext.sessionState, + Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) => + val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null))) + val numOutputRows = longMetric("numOutputRows") + val numUpdatedStateRows = longMetric("numUpdatedStateRows") + val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") + val commitTimeMs = longMetric("commitTimeMs") + val updatesStartTimeNs = System.nanoTime + + val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L) + var rowCount = startCount + + val result = iter.filter { r => +val x = rowCount < streamLimit --- End diff -- Oh, I missed that distribution. Makes sense then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21662#discussion_r199284336 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.streaming.state.StateStoreOps +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType} +import org.apache.spark.util.CompletionIterator + +/** + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit + * rows are returned. + */ +case class StreamingLimitExec( +streamLimit: Long, +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None) + extends UnaryExecNode with StateStoreWriter { + + private val keySchema = StructType(Array(StructField("key", NullType))) + private val valueSchema = StructType(Array(StructField("value", LongType))) + + override protected def doExecute(): RDD[InternalRow] = { +metrics // force lazy init at driver + +child.execute().mapPartitionsWithStateStore( + getStateInfo, + keySchema, + valueSchema, + indexOrdinal = None, + sqlContext.sessionState, + Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) => + val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null))) + val numOutputRows = longMetric("numOutputRows") + val numUpdatedStateRows = longMetric("numUpdatedStateRows") + val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") + val commitTimeMs = longMetric("commitTimeMs") + val updatesStartTimeNs = System.nanoTime + + val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L) + var rowCount = startCount + + val result = iter.filter { r => +val x = rowCount < streamLimit --- End diff -- Isn't this going to result in `streamLimit` records in each partition? I would expect we'd need something like the Global/LocalLimit split. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21662#discussion_r199284559 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -805,6 +806,75 @@ class StreamSuite extends StreamTest { } } + test("streaming limit without state") { --- End diff -- Related to my above comment, I think all of these tests end up only testing a single input partition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21662#discussion_r199283820 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala --- @@ -70,35 +68,9 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { checkAnswer(sink.allData, 1 to 9) } - test("directly add data in Append output mode with row limit") { --- End diff -- Nit: I'd kinda prefer reverting as a separate PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20351: [SPARK-23014][SS] Fully remove V1 memory sink.
Github user jose-torres closed the pull request at: https://github.com/apache/spark/pull/20351 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198571824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- E
[GitHub] spark issue #21560: [SPARK-24386][SS] coalesce(1) aggregates in continuous p...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21560 Sorry, that wasn't meant to be a complete push. Added the tests now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198571496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- (same comment as above applies here - we don't have partitioning information in analysis) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198337615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( sc: SparkContext, dataQueueSize: Int, epochPollIntervalMs: Long, -@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) +private val readerFactories: Seq[InputPartition[UnsafeRow]]) --- End diff -- We list the partitions when computing the coalesce RDD. Should we instead be packing the partitions into the partitions of the coalesce RDD? I'd assumed it was valid to expect that rdd.partitions would work on executors, but maybe it's not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21617 Well, "clear" is relative. Since we're trying to provide functionality in the Dataframe API, it's perfectly alright for the RDD graph to end up looking a bit weird. It seems feasible to do something like: * Have a stream reader RDD write side output to some special shuffle partition (set of partitions?) which the main query knows not to read. * Have a stream writer RDD with two heterogeneous sets of partitions: one to write the main query to the sink, and another to apply the specified action to the side output. I agree that watermarks should be applied immediately after the data reader - other streaming systems generally require this, and Spark does not seem to be getting any benefits from having a more general watermark concept. I haven't had time to push for this change, but I think it's known that the current Spark watermark model is flawed - I'd support fixing it for sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197936350 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true --- End diff -- Oh wait, I see what you mean. Repartition(5, ...) would never be matched by this rule, since it only applies to Aggregate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197935989 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true --- End diff -- I don't think there's any particular reason we need to. There's no reason we couldn't execute multiple repartitions if the optimizer isn't smart enough to combine them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197933535 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( numPartitions: Int, queueSize: Int = 1024, numShuffleWriters: Int = 1, -epochIntervalMs: Long = 1000) +epochIntervalMs: Long = 1000, +val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) --- End diff -- This is just a default argument to make tests less wordy. I can remove it if you think that's best, but it doesn't impose a restriction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197933175 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) --- End diff -- I've made some changes to try to restrict the assumption that the number of partitions is 1 to two places: * ContinuousCoalesceExec * The output partitioner in ContinuousCoalesceRDD, since it's not obvious to me what the right strategy to get this would be in the general case. If you have ideas I'm open to removing this too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197930245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) --- End diff -- Repartition would normally imply distributed execution, which isn't happening here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197929943 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- E
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197929262 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- I agree that it wouldn't be needed, but partitioning information is not always available during analysis. So I don't think we can write the more granular check suggested here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197928934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -98,6 +98,10 @@ class ContinuousDataSourceRDD( override def getPreferredLocations(split: Partition): Seq[String] = { split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations() } + + override def clearDependencies(): Unit = { +throw new IllegalStateException("Continuous RDDs cannot be checkpointed") --- End diff -- I don't know, I'm unfamiliar with this method. @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21617 LGTM, but note that the rows being counted here are the rows persisted into the state store, which aren't necessarily the input rows. So the side-channel described in the JIRA would be orthogonal to this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196930368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( sc: SparkContext, dataQueueSize: Int, epochPollIntervalMs: Long, -@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) +private val readerFactories: Seq[InputPartition[UnsafeRow]]) --- End diff -- We need to be able to generate the full list of partitions from within a single task in order for coalesce to work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196924994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) + extends RDD[InternalRow](reader.context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = reader.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val threads = prev.partitions.map { prevSplit => +new Thread() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threads.foreach(_.interrupt()) + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + threads.foreach(_.start()) +} + +reader.compute(reader.partitions(split.index), context) + } + + override def getDependencies: Seq[Dependency[_]] = { +Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = Seq(0) --- End diff -- Yeah, I confused myself when looking at the normal coalesce RDD. The default dependency handling is correct here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org