[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r165117379 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala --- @@ -75,6 +76,52 @@ case class StreamingExecutionRelation( ) } +// We have to pack in the V1 data source as a shim, for the case when a source implements +// continuous processing (which is always V2) but only has V1 microbatch support. We don't +// know at read time whether the query is conntinuous or not, so we need to be able to --- End diff -- The trigger isn't specified at the point where the dataframe is created. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r165055237 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala --- @@ -75,6 +76,52 @@ case class StreamingExecutionRelation( ) } +// We have to pack in the V1 data source as a shim, for the case when a source implements +// continuous processing (which is always V2) but only has V1 microbatch support. We don't +// know at read time whether the query is conntinuous or not, so we need to be able to --- End diff -- can't we know it from the specified trigger? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19984 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158547367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage} +import org.apache.spark.util.RpcUtils + +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable + +// Driver epoch trigger message +/** + * Atomically increment the current epoch and get the new value. + */ +private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage + +// Init messages +/** + * Set the reader and writer partition counts. Tasks may not be started until the coordinator + * has acknowledged these messages. + */ +private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage +case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage + +// Partition task messages +/** + * Get the current epoch. + */ +private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage +/** + * Commit a partition at the specified epoch with the given message. + */ +private[sql] case class CommitPartitionEpoch( +partitionId: Int, +epoch: Long, +message: WriterCommitMessage) extends EpochCoordinatorMessage +/** + * Report that a partition is ending the specified epoch at the specified offset. + */ +private[sql] case class ReportPartitionOffset( +partitionId: Int, +epoch: Long, +offset: PartitionOffset) extends EpochCoordinatorMessage + + +/** Helper object used to create reference to [[EpochCoordinator]]. */ +private[sql] object EpochCoordinatorRef extends Logging { + private def endpointName(runId: String) = s"EpochCoordinator-$runId" + + /** + * Create a reference to a new [[EpochCoordinator]]. + */ + def create( + writer: ContinuousWriter, + reader: ContinuousReader, + startEpoch: Long, + queryId: String, + runId: String, + session: SparkSession, + env: SparkEnv): RpcEndpointRef = synchronized { +val coordinator = new EpochCoordinator(writer, reader, startEpoch, queryId, session, env.rpcEnv) +val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator) +logInfo("Registered EpochCoordinator endpoint") +ref + } + + def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized { +val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), env.conf, env.rpcEnv) +logDebug("Retrieved existing EpochCoordinator endpoint") +rpcEndpointRef + } +} + +/** + * Handles three major epoch coordination tasks for continuous processing: + * + * * Maintains a local epoch counter (the "driver epoch"), incremented by IncrementAndGetEpoch + * and pollable from executors by GetCurrentEpoch. Note that this epoch is *not* immediately + * reflected anywhere in ContinuousExecution. + * * Collates ReportPartitionOffset messages, and forwards to ContinuousExecution when all + * readers have ended a given epoch. + * * Collates CommitPartitionEpoch messages, and forwards to ContinuousExecution when all readers + * have
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158399908 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: InterruptedException if state.get().equals(RECONFIGURING) => + // swallow exception and run again + state.set(ACTIVE) + } +} while (state.get() == ACTIVE) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This function
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158398822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: InterruptedException if state.get().equals(RECONFIGURING) => + // swallow exception and run again + state.set(ACTIVE) + } +} while (state.get() == ACTIVE) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158389368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: InterruptedException if state.get().equals(RECONFIGURING) => + // swallow exception and run again + state.set(ACTIVE) + } +} while (state.get() == ACTIVE) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This function
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158391581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: InterruptedException if state.get().equals(RECONFIGURING) => + // swallow exception and run again + state.set(ACTIVE) + } +} while (state.get() == ACTIVE) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This function
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158389177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: InterruptedException if state.get().equals(RECONFIGURING) => + // swallow exception and run again + state.set(ACTIVE) + } +} while (state.get() == ACTIVE) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This function
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158388472 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L --- End diff -- nit: unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158361942 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -109,6 +125,42 @@ object DataWritingSparkTask extends Logging { logError(s"Writer for partition ${context.partitionId()} aborted.") }) } + + def runContinuous( + writeTask: DataWriterFactory[InternalRow], + context: TaskContext, + iter: Iterator[InternalRow]): WriterCommitMessage = { +val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) +val currentMsg: WriterCommitMessage = null +var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + +do { + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { +try { + iter.foreach(dataWriter.write) + logInfo(s"Writer for partition ${context.partitionId()} is committing.") + val msg = dataWriter.commit() + logInfo(s"Writer for partition ${context.partitionId()} committed.") + EpochCoordinatorRef.get(runId, SparkEnv.get).send( --- End diff -- nit: `EpochCoordinatorRef.get` is not cheap. You can store it outside the loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158382180 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; +import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger; --- End diff -- nit: move this below `import org.apache.spark.annotation.InterfaceStability;` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158391247 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage} +import org.apache.spark.util.RpcUtils + +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable + +// Driver epoch trigger message +/** + * Atomically increment the current epoch and get the new value. + */ +private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage + +// Init messages +/** + * Set the reader and writer partition counts. Tasks may not be started until the coordinator + * has acknowledged these messages. + */ +private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage +case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage + +// Partition task messages +/** + * Get the current epoch. + */ +private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage +/** + * Commit a partition at the specified epoch with the given message. + */ +private[sql] case class CommitPartitionEpoch( +partitionId: Int, +epoch: Long, +message: WriterCommitMessage) extends EpochCoordinatorMessage +/** + * Report that a partition is ending the specified epoch at the specified offset. + */ +private[sql] case class ReportPartitionOffset( +partitionId: Int, +epoch: Long, +offset: PartitionOffset) extends EpochCoordinatorMessage + + +/** Helper object used to create reference to [[EpochCoordinator]]. */ +private[sql] object EpochCoordinatorRef extends Logging { + private def endpointName(runId: String) = s"EpochCoordinator-$runId" + + /** + * Create a reference to a new [[EpochCoordinator]]. + */ + def create( + writer: ContinuousWriter, + reader: ContinuousReader, + startEpoch: Long, + queryId: String, + runId: String, + session: SparkSession, + env: SparkEnv): RpcEndpointRef = synchronized { +val coordinator = new EpochCoordinator(writer, reader, startEpoch, queryId, session, env.rpcEnv) +val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator) +logInfo("Registered EpochCoordinator endpoint") +ref + } + + def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized { +val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), env.conf, env.rpcEnv) +logDebug("Retrieved existing EpochCoordinator endpoint") +rpcEndpointRef + } +} + +/** + * Handles three major epoch coordination tasks for continuous processing: + * + * * Maintains a local epoch counter (the "driver epoch"), incremented by IncrementAndGetEpoch + * and pollable from executors by GetCurrentEpoch. Note that this epoch is *not* immediately + * reflected anywhere in ContinuousExecution. + * * Collates ReportPartitionOffset messages, and forwards to ContinuousExecution when all + * readers have ended a given epoch. + * * Collates CommitPartitionEpoch messages, and forwards to ContinuousExecution when all readers + * have
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158397203 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.continuous + +import java.io.{File, InterruptedIOException, IOException, UncheckedIOException} +import java.nio.channels.ClosedByInterruptException +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit} + +import scala.reflect.ClassTag +import scala.util.control.ControlThrowable + +import com.google.common.util.concurrent.UncheckedExecutionException +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.StreamSourceProvider +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.test.TestSparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +class ContinuousSuiteBase extends StreamTest { + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( +new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = { +query match { + case s: ContinuousExecution => +assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") +val reader = s.lastExecution.executedPlan.collectFirst { + case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r +}.get + +val deltaMs = numTriggers * 1000 + 300 +while (System.currentTimeMillis < reader.creationTime + deltaMs) { + Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis) +} +} + } + + // A continuous trigger that will only fire the initial time for the duration of a test. + // This allows clean testing with manual epoch advancement. + protected val longContinuousTrigger = Trigger.Continuous("1 hour") +} + +class ContinuousSuite extends ContinuousSuiteBase { + import testImplicits._ + + test("basic rate source") { +val df = spark.readStream + .format("rate") + .option("numPartitions", "5") + .option("rowsPerSecond", "5") + .load() + .select('value) + +testStream(df, useV2Sink = true)( + StartStream(longContinuousTrigger), + AwaitEpoch(0), + Execute(waitForRateSourceTriggers(_, 2)), + IncrementEpoch(), + CheckAnswer(scala.Range(0, 10): _*), --- End diff -- let's not do an exact match check here. ---
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158159270 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) --- End diff -- I think the coupling is correct here. ProcessingTime represents the rate of progress through the query's fenceposts, which applies here as well as it does in the microbatch case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158158855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158156114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: Throwable if state.get().equals(RECONFIGURING) => --- End diff -- The sequencing is: - The pre-existing method stopSources() marks the ContinuousReader objects as stopped and cleans up any resources they may be holding. This doesn't affect query execution, and stopSources already swallows any non-fatal exception thrown by a stop() implementation. - The reconfiguring state is set. - The job group for this run is cancelled,
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158117033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -303,7 +299,7 @@ abstract class StreamExecution( e, committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString) -logError(s"Query $prettyIdString terminated with error", e) +// logError(s"Query $prettyIdString terminated with error", e) --- End diff -- nit: restore `logError` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158137370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158120930 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: Throwable if state.get().equals(RECONFIGURING) => --- End diff -- there is a race condition here. Since you stop the source first, this thread may throw the exception before `state` is set to `RECONFIGURING`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158120028 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: Throwable if state.get().equals(RECONFIGURING) => + // swallow exception and run again + state.set(ACTIVE) + } +} while (state.get() == ACTIVE) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This function must be called + * before any processing occurs and
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158119082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -266,6 +266,21 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } + /** + * Removes all log entries later than thresholdBatchId (exclusive). + */ + def purgeAfter(thresholdBatchId: Long): Unit = { +val batchIds = fileManager.list(metadataPath, batchFilesFilter) + .map(f => pathToBatchId(f.getPath)) + +for (batchId <- batchIds if batchId > thresholdBatchId) { + print(s"A purging\n") --- End diff -- nit: remove this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158121081 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: Throwable if state.get().equals(RECONFIGURING) => --- End diff -- `_: Throwable` => `NonFatal(e)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158135301 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158122336 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) --- End diff -- I would use `ThreadUtils.newDaemonSingleThreadScheduledExecutor` rather than `ProcessingTimeExecutor`. `ProcessingTimeExecutor` is designed for ProcessingTimeTrigger. It's weird to use it here, in addition, we may make some changes into ProcessingTimeExecutor in future and they may break Continuous execution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158136868 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158139103 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158116511 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage} +import org.apache.spark.util.RpcUtils + +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable + +// Driver epoch trigger message +/** + * Atomically increment the current epoch and get the new value. + */ +private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage --- End diff -- nit: `case class` -> `case object` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158121770 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: Throwable if state.get().equals(RECONFIGURING) => --- End diff -- In a second thought, it's dangerous to swallow exception here. If the source has some bugs in `Source.stop`, it will hide it and continue to run. I would expect to see a better mechanism to ask the source to stop it without throwing exceptions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158138974 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158134853 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158135254 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if (epochPollFailed.get()) { + throw new
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158116796 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage} +import org.apache.spark.util.RpcUtils + +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable + +// Driver epoch trigger message +/** + * Atomically increment the current epoch and get the new value. + */ +private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage + +// Init messages +/** + * Set the reader and writer partition counts. Tasks may not be started until the coordinator + * has acknowledged these messages. + */ +private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage +case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage + +// Partition task messages +/** + * Get the current epoch. + */ +private[sql] case class GetCurrentEpoch() extends EpochCoordinatorMessage --- End diff -- nit: `case class` -> `case object` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157278184 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1035,6 +1035,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = +buildConf("spark.sql.streaming.continuous.executorQueueSize") +.internal() +.doc("The size (measured in number of rows) of the queue used in continuous execution to" + + " buffer the results of a ContinuousDataReader.") +.intConf --- End diff -- Should it be? I can't imagine anything close to MAX_INT being a reasonable value here. Will it be hard to migrate to a long if we later discover it's needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157135337 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) { public static Trigger Once() { return OneTimeTrigger$.MODULE$; } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 2.3.0 + */ + public static Trigger Continuous(long intervalMs) { +return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) --- End diff -- `Trigger.Continuous(10, TimeUnit.SECONDS)` instead of `ProcessingTime.create(10, TimeUnit.SECONDS)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124515 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) { public static Trigger Once() { return OneTimeTrigger$.MODULE$; } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 2.3.0 + */ + public static Trigger Continuous(long intervalMs) { +return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.3.0 + */ + public static Trigger Continuous(long interval, TimeUnit timeUnit) { +return ContinuousTrigger.create(interval, timeUnit); + } + + /** + * (Scala-friendly) + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import scala.concurrent.duration._ + *df.writeStream.trigger(Trigger.Continuous(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger Continuous(Duration interval) { +return ContinuousTrigger.apply(interval); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *df.writeStream.trigger(Trigger.Continuous("10 seconds")) + * }}} + * @since 2.2.0 --- End diff -- 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124505 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) { public static Trigger Once() { return OneTimeTrigger$.MODULE$; } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 2.3.0 + */ + public static Trigger Continuous(long intervalMs) { +return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.3.0 + */ + public static Trigger Continuous(long interval, TimeUnit timeUnit) { +return ContinuousTrigger.create(interval, timeUnit); + } + + /** + * (Scala-friendly) + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import scala.concurrent.duration._ + *df.writeStream.trigger(Trigger.Continuous(10.seconds)) + * }}} + * @since 2.2.0 --- End diff -- 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124394 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1035,6 +1035,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = +buildConf("spark.sql.streaming.continuous.executorQueueSize") +.internal() +.doc("The size (measured in number of rows) of the queue used in continuous execution to" + + " buffer the results of a ContinuousDataReader.") +.intConf --- End diff -- `longConf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124263 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1035,6 +1035,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = +buildConf("spark.sql.streaming.continuous.executorQueueSize") +.internal() +.doc("The size (measured in number of rows) of the queue used in continuous execution to" + + " buffer the results of a ContinuousDataReader.") +.intConf +.createWithDefault(1024) + + val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS = +buildConf("spark.sql.streaming.continuous.executorPollIntervalMs") + .internal() + .doc("The interval at which continuous execution readers will poll to check whether" + +" the epoch has advanced on the driver.") + .intConf --- End diff -- `timeConf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19984 [SPARK-22789] Map-only continuous processing execution ## What changes were proposed in this pull request? Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC. ## How was this patch tested? new unit-ish tests (exercising execution end to end) You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark continuous-impl Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19984.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19984 commit d6bea84447d910e79d5926972d87a80bc5dc2e2e Author: Jose TorresDate: 2017-12-07T22:08:28Z Refactor StreamExecution into a parent class so continuous processing can extend it commit df6b8861173d1e7853952c8f3ffe504975efe204 Author: Jose Torres Date: 2017-12-12T19:31:28Z address fmt commit 6f0ce6b1cf1abf602c2b02ce6d31f46f8fa71b7c Author: Jose Torres Date: 2017-12-13T00:09:48Z slight changes commit 2b360ab49bcab3c73ea85ce62202e40e950931ef Author: Jose Torres Date: 2017-12-13T00:10:34Z rm spurious space commit 1b19f1cef17e7324997649ad8c5f97887912 Author: Jose Torres Date: 2017-12-13T00:35:30Z fix compile commit 96eba13be9764e63f3d1375d7b51dbfd0675aa98 Author: Jose Torres Date: 2017-12-11T20:48:20Z harness commit 2d5efadb9e7662363e3e4a3c66e0f5f73e4935ef Author: Jose Torres Date: 2017-12-11T21:18:25Z awaitEpoch impl commit 578bbb7eb0725b795ac65d1beda436515f4f4eba Author: Jose Torres Date: 2017-12-11T21:46:09Z move local[10] to only continuous suite commit 9051eff6c88838ac61ab45763ed84d593e2d4837 Author: Jose Torres Date: 2017-12-11T21:49:55Z repeatedly restart commit 60fa4477591cc264b9ea253f64065d762ce3f96f Author: Jose Torres Date: 2017-12-11T22:02:52Z fix some simple TODOs commit ea8e76ec75752d134433730ee1a007cce1fdcfe8 Author: Jose Torres Date: 2017-12-11T22:11:18Z use runId instead of queryId for endpoint name commit d0f3cc7701d9eb3e7df571561e751f03c0537f3a Author: Jose Torres Date: 2017-12-11T22:19:03Z more simple todos commit ba9dbaa1be2f54827a42f3177669082e7d1f99e2 Author: Jose Torres Date: 2017-12-11T22:27:12Z remove old state commit 2cd005f4685e492ae78d6b9c579c80c2370d2f14 Author: Jose Torres Date: 2017-12-11T22:35:51Z remove clean shutdown workaround in StreamTest commit a7fa31fb5375074d888bd0a94e317ad3f1692e5a Author: Jose Torres Date: 2017-12-11T22:50:09Z update ContinuousExecution docs commit f687432a58acf7337885edfc01adc94188d174d8 Author: Jose Torres Date: 2017-12-11T22:59:14Z add comments to EpochCoordinator commit 987b011ee78292c3379559910ebe101daf4f9450 Author: Jose Torres Date: 2017-12-12T00:02:54Z change offset semantic to end of previous epoch commit 5494fc50ef99b3e584c287b03eaa32b30657a5ce Author: Jose Torres Date: 2017-12-12T00:18:40Z document EpochCoordinator commit d6ef404b85fa6977b5f38a853dca11de5189b3f9 Author: Jose Torres Date: 2017-12-12T02:06:44Z simplify epoch handling commit 647bd2745c1c0842002d4f71b61aa34beb0f8b29 Author: Jose Torres Date: 2017-12-12T19:17:58Z stress tests commit 053a9f349a4829433a495aa5989f1ca1c8a3256e Author: Jose Torres Date: 2017-12-12T20:17:22Z add minBatchesToRetain commit 7072d21444388fe167fa7e3475b3e95ec9923d5e Author: Jose Torres Date: 2017-12-12T20:43:33Z add confs commit 4083a8f5c6b6ef298726234d54f23a90e971e77e Author: Jose Torres Date: 2017-12-12T21:10:33Z latency suite not meaningful here commit 41d391f2027a4e8b3730d15cea7b7fbcdcec27de Author: Jose Torres Date: 2017-12-13T00:04:07Z more stress::q commit 402cfa3b10dfb0f37ce8d94336be3b3c01fe9f90 Author: Jose Torres Date: 2017-12-13T18:55:23Z use temp dir commit e4a1bc19db9ea0233879d270e725ed58d95a34ad Author: Jose Torres Date: 2017-12-14T19:37:36Z fix against rebase commit 8887b3c92afe8bb1659f600785af5d97f085f2bb Author: Jose Torres Date: 2017-12-14T21:32:16Z fix ser/deser commit 60bf0e33f20134af296d85b5c52729c4063ef2e1 Author: Jose Torres Date: