[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2018-01-31 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r165117379
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 ---
@@ -75,6 +76,52 @@ case class StreamingExecutionRelation(
   )
 }
 
+// We have to pack in the V1 data source as a shim, for the case when a 
source implements
+// continuous processing (which is always V2) but only has V1 microbatch 
support. We don't
+// know at read time whether the query is conntinuous or not, so we need 
to be able to
--- End diff --

The trigger isn't specified at the point where the dataframe is created.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2018-01-31 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r165055237
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 ---
@@ -75,6 +76,52 @@ case class StreamingExecutionRelation(
   )
 }
 
+// We have to pack in the V1 data source as a shim, for the case when a 
source implements
+// continuous processing (which is always V2) but only has V1 microbatch 
support. We don't
+// know at read time whether the query is conntinuous or not, so we need 
to be able to
--- End diff --

can't we know it from the specified trigger?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19984


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158547367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, 
WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends 
Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
+
+// Init messages
+/**
+ * Set the reader and writer partition counts. Tasks may not be started 
until the coordinator
+ * has acknowledged these messages.
+ */
+private[sql] case class SetReaderPartitions(numPartitions: Int) extends 
EpochCoordinatorMessage
+case class SetWriterPartitions(numPartitions: Int) extends 
EpochCoordinatorMessage
+
+// Partition task messages
+/**
+ * Get the current epoch.
+ */
+private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage
+/**
+ * Commit a partition at the specified epoch with the given message.
+ */
+private[sql] case class CommitPartitionEpoch(
+partitionId: Int,
+epoch: Long,
+message: WriterCommitMessage) extends EpochCoordinatorMessage
+/**
+ * Report that a partition is ending the specified epoch at the specified 
offset.
+ */
+private[sql] case class ReportPartitionOffset(
+partitionId: Int,
+epoch: Long,
+offset: PartitionOffset) extends EpochCoordinatorMessage
+
+
+/** Helper object used to create reference to [[EpochCoordinator]]. */
+private[sql] object EpochCoordinatorRef extends Logging {
+  private def endpointName(runId: String) = s"EpochCoordinator-$runId"
+
+  /**
+   * Create a reference to a new [[EpochCoordinator]].
+   */
+  def create(
+  writer: ContinuousWriter,
+  reader: ContinuousReader,
+  startEpoch: Long,
+  queryId: String,
+  runId: String,
+  session: SparkSession,
+  env: SparkEnv): RpcEndpointRef = synchronized {
+val coordinator = new EpochCoordinator(writer, reader, startEpoch, 
queryId, session, env.rpcEnv)
+val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator)
+logInfo("Registered EpochCoordinator endpoint")
+ref
+  }
+
+  def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized {
+val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), 
env.conf, env.rpcEnv)
+logDebug("Retrieved existing EpochCoordinator endpoint")
+rpcEndpointRef
+  }
+}
+
+/**
+ * Handles three major epoch coordination tasks for continuous processing:
+ *
+ * * Maintains a local epoch counter (the "driver epoch"), incremented by 
IncrementAndGetEpoch
+ *   and pollable from executors by GetCurrentEpoch. Note that this epoch 
is *not* immediately
+ *   reflected anywhere in ContinuousExecution.
+ * * Collates ReportPartitionOffset messages, and forwards to 
ContinuousExecution when all
+ *   readers have ended a given epoch.
+ * * Collates CommitPartitionEpoch messages, and forwards to 
ContinuousExecution when all readers
+ *   have 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158399908
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: InterruptedException if state.get().equals(RECONFIGURING) 
=>
+  // swallow exception and run again
+  state.set(ACTIVE)
+  }
+} while (state.get() == ACTIVE)
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 
function 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158398822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: InterruptedException if state.get().equals(RECONFIGURING) 
=>
+  // swallow exception and run again
+  state.set(ACTIVE)
+  }
+} while (state.get() == ACTIVE)
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158389368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: InterruptedException if state.get().equals(RECONFIGURING) 
=>
+  // swallow exception and run again
+  state.set(ACTIVE)
+  }
+} while (state.get() == ACTIVE)
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 
function 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158391581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: InterruptedException if state.get().equals(RECONFIGURING) 
=>
+  // swallow exception and run again
+  state.set(ACTIVE)
+  }
+} while (state.get() == ACTIVE)
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 
function 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158389177
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: InterruptedException if state.get().equals(RECONFIGURING) 
=>
+  // swallow exception and run again
+  state.set(ACTIVE)
+  }
+} while (state.get() == ACTIVE)
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 
function 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158388472
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
--- End diff --

nit: unused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158361942
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -109,6 +125,42 @@ object DataWritingSparkTask extends Logging {
   logError(s"Writer for partition ${context.partitionId()} aborted.")
 })
   }
+
+  def runContinuous(
+  writeTask: DataWriterFactory[InternalRow],
+  context: TaskContext,
+  iter: Iterator[InternalRow]): WriterCommitMessage = {
+val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+val currentMsg: WriterCommitMessage = null
+var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+do {
+  // write the data and commit this writer.
+  Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+try {
+  iter.foreach(dataWriter.write)
+  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
+  val msg = dataWriter.commit()
+  logInfo(s"Writer for partition ${context.partitionId()} 
committed.")
+  EpochCoordinatorRef.get(runId, SparkEnv.get).send(
--- End diff --

nit: `EpochCoordinatorRef.get` is not cheap. You can store it outside the 
loop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158382180
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -19,6 +19,7 @@
 
 import java.util.concurrent.TimeUnit;
 
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
--- End diff --

nit: move this below `import 
org.apache.spark.annotation.InterfaceStability;`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158391247
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, 
WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends 
Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
+
+// Init messages
+/**
+ * Set the reader and writer partition counts. Tasks may not be started 
until the coordinator
+ * has acknowledged these messages.
+ */
+private[sql] case class SetReaderPartitions(numPartitions: Int) extends 
EpochCoordinatorMessage
+case class SetWriterPartitions(numPartitions: Int) extends 
EpochCoordinatorMessage
+
+// Partition task messages
+/**
+ * Get the current epoch.
+ */
+private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage
+/**
+ * Commit a partition at the specified epoch with the given message.
+ */
+private[sql] case class CommitPartitionEpoch(
+partitionId: Int,
+epoch: Long,
+message: WriterCommitMessage) extends EpochCoordinatorMessage
+/**
+ * Report that a partition is ending the specified epoch at the specified 
offset.
+ */
+private[sql] case class ReportPartitionOffset(
+partitionId: Int,
+epoch: Long,
+offset: PartitionOffset) extends EpochCoordinatorMessage
+
+
+/** Helper object used to create reference to [[EpochCoordinator]]. */
+private[sql] object EpochCoordinatorRef extends Logging {
+  private def endpointName(runId: String) = s"EpochCoordinator-$runId"
+
+  /**
+   * Create a reference to a new [[EpochCoordinator]].
+   */
+  def create(
+  writer: ContinuousWriter,
+  reader: ContinuousReader,
+  startEpoch: Long,
+  queryId: String,
+  runId: String,
+  session: SparkSession,
+  env: SparkEnv): RpcEndpointRef = synchronized {
+val coordinator = new EpochCoordinator(writer, reader, startEpoch, 
queryId, session, env.rpcEnv)
+val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator)
+logInfo("Registered EpochCoordinator endpoint")
+ref
+  }
+
+  def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized {
+val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), 
env.conf, env.rpcEnv)
+logDebug("Retrieved existing EpochCoordinator endpoint")
+rpcEndpointRef
+  }
+}
+
+/**
+ * Handles three major epoch coordination tasks for continuous processing:
+ *
+ * * Maintains a local epoch counter (the "driver epoch"), incremented by 
IncrementAndGetEpoch
+ *   and pollable from executors by GetCurrentEpoch. Note that this epoch 
is *not* immediately
+ *   reflected anywhere in ContinuousExecution.
+ * * Collates ReportPartitionOffset messages, and forwards to 
ContinuousExecution when all
+ *   readers have ended a given epoch.
+ * * Collates CommitPartitionEpoch messages, and forwards to 
ContinuousExecution when all readers
+ *   have 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158397203
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import java.io.{File, InterruptedIOException, IOException, 
UncheckedIOException}
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.{CountDownLatch, ExecutionException, 
TimeoutException, TimeUnit}
+
+import scala.reflect.ClassTag
+import scala.util.control.ControlThrowable
+
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.execution.command.ExplainCommand
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, 
WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreConf, StateStoreId, StateStoreProvider}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.StreamSourceProvider
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.test.TestSparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+class ContinuousSuiteBase extends StreamTest {
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+new SparkContext(
+  "local[10]",
+  "continuous-stream-test-sql-context",
+  sparkConf.set("spark.sql.testkey", "true")))
+
+  protected def waitForRateSourceTriggers(query: StreamExecution, 
numTriggers: Int): Unit = {
+query match {
+  case s: ContinuousExecution =>
+assert(numTriggers >= 2, "must wait for at least 2 triggers to 
ensure query is initialized")
+val reader = s.lastExecution.executedPlan.collectFirst {
+  case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r
+}.get
+
+val deltaMs = numTriggers * 1000 + 300
+while (System.currentTimeMillis < reader.creationTime + deltaMs) {
+  Thread.sleep(reader.creationTime + deltaMs - 
System.currentTimeMillis)
+}
+}
+  }
+
+  // A continuous trigger that will only fire the initial time for the 
duration of a test.
+  // This allows clean testing with manual epoch advancement.
+  protected val longContinuousTrigger = Trigger.Continuous("1 hour")
+}
+
+class ContinuousSuite extends ContinuousSuiteBase {
+  import testImplicits._
+
+  test("basic rate source") {
+val df = spark.readStream
+  .format("rate")
+  .option("numPartitions", "5")
+  .option("rowsPerSecond", "5")
+  .load()
+  .select('value)
+
+testStream(df, useV2Sink = true)(
+  StartStream(longContinuousTrigger),
+  AwaitEpoch(0),
+  Execute(waitForRateSourceTriggers(_, 2)),
+  IncrementEpoch(),
+  CheckAnswer(scala.Range(0, 10): _*),
--- End diff --

let's not do an exact match check here.


---


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158159270
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
--- End diff --

I think the coupling is correct here. ProcessingTime represents the rate of 
progress through the query's fenceposts, which applies here as well as it does 
in the microbatch case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158158855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158156114
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --

The sequencing is:

- The pre-existing method stopSources() marks the ContinuousReader objects 
as stopped and cleans up any resources they may be holding. This doesn't affect 
query execution, and stopSources already swallows any non-fatal exception 
thrown by a stop() implementation.
- The reconfiguring state is set.
- The job group for this run is cancelled, 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158117033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -303,7 +299,7 @@ abstract class StreamExecution(
   e,
   committedOffsets.toOffsetSeq(sources, 
offsetSeqMetadata).toString,
   availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata).toString)
-logError(s"Query $prettyIdString terminated with error", e)
+// logError(s"Query $prettyIdString terminated with error", e)
--- End diff --

nit: restore `logError`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158137370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158120930
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --

there is a race condition here. Since you stop the source first, this 
thread may throw the exception before `state` is set to `RECONFIGURING`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158120028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: Throwable if state.get().equals(RECONFIGURING) =>
+  // swallow exception and run again
+  state.set(ACTIVE)
+  }
+} while (state.get() == ACTIVE)
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
+   * before any processing occurs and 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158119082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -266,6 +266,21 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 }
   }
 
+  /**
+   * Removes all log entries later than thresholdBatchId (exclusive).
+   */
+  def purgeAfter(thresholdBatchId: Long): Unit = {
+val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  .map(f => pathToBatchId(f.getPath))
+
+for (batchId <- batchIds if batchId > thresholdBatchId) {
+  print(s"A purging\n")
--- End diff --

nit: remove this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158121081
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --

`_: Throwable` => `NonFatal(e)` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158135301
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158122336
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
--- End diff --

I would use `ThreadUtils.newDaemonSingleThreadScheduledExecutor` rather 
than `ProcessingTimeExecutor`. `ProcessingTimeExecutor` is designed for 
ProcessingTimeTrigger. It's weird to use it here, in addition, we may make some 
changes into ProcessingTimeExecutor in future and they may break Continuous 
execution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158136868
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158139103
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158116511
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, 
WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends 
Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case class IncrementAndGetEpoch() extends 
EpochCoordinatorMessage
--- End diff --

nit: `case class` -> `case object`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158121770
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --

In a second thought, it's dangerous to swallow exception here. If the 
source has some bugs in `Source.stop`, it will hide it and continue to run. I 
would expect to see a better mechanism to ask the source to stop it without 
throwing exceptions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158138974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158134853
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158135254
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if (epochPollFailed.get()) {
+  throw new 

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158116796
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, 
WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends 
Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case class IncrementAndGetEpoch() extends 
EpochCoordinatorMessage
+
+// Init messages
+/**
+ * Set the reader and writer partition counts. Tasks may not be started 
until the coordinator
+ * has acknowledged these messages.
+ */
+private[sql] case class SetReaderPartitions(numPartitions: Int) extends 
EpochCoordinatorMessage
+case class SetWriterPartitions(numPartitions: Int) extends 
EpochCoordinatorMessage
+
+// Partition task messages
+/**
+ * Get the current epoch.
+ */
+private[sql] case class GetCurrentEpoch() extends EpochCoordinatorMessage
--- End diff --

nit: `case class` -> `case object`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-15 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157278184
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+buildConf("spark.sql.streaming.continuous.executorQueueSize")
+.internal()
+.doc("The size (measured in number of rows) of the queue used in 
continuous execution to" +
+  " buffer the results of a ContinuousDataReader.")
+.intConf
--- End diff --

Should it be? I can't imagine anything close to MAX_INT being a reasonable 
value here. Will it be hard to migrate to a long if we later discover it's 
needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157135337
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
   public static Trigger Once() {
 return OneTimeTrigger$.MODULE$;
   }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * @since 2.3.0
+   */
+  public static Trigger Continuous(long intervalMs) {
+return ContinuousTrigger.apply(intervalMs);
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *import java.util.concurrent.TimeUnit
+   *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
--- End diff --

`Trigger.Continuous(10, TimeUnit.SECONDS)` instead of 
`ProcessingTime.create(10, TimeUnit.SECONDS)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157124515
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
   public static Trigger Once() {
 return OneTimeTrigger$.MODULE$;
   }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * @since 2.3.0
+   */
+  public static Trigger Continuous(long intervalMs) {
+return ContinuousTrigger.apply(intervalMs);
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *import java.util.concurrent.TimeUnit
+   *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.3.0
+   */
+  public static Trigger Continuous(long interval, TimeUnit timeUnit) {
+return ContinuousTrigger.create(interval, timeUnit);
+  }
+
+  /**
+   * (Scala-friendly)
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *import scala.concurrent.duration._
+   *df.writeStream.trigger(Trigger.Continuous(10.seconds))
+   * }}}
+   * @since 2.2.0
+   */
+  public static Trigger Continuous(Duration interval) {
+return ContinuousTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *df.writeStream.trigger(Trigger.Continuous("10 seconds"))
+   * }}}
+   * @since 2.2.0
--- End diff --

2.3.0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157124505
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
   public static Trigger Once() {
 return OneTimeTrigger$.MODULE$;
   }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * @since 2.3.0
+   */
+  public static Trigger Continuous(long intervalMs) {
+return ContinuousTrigger.apply(intervalMs);
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *import java.util.concurrent.TimeUnit
+   *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.3.0
+   */
+  public static Trigger Continuous(long interval, TimeUnit timeUnit) {
+return ContinuousTrigger.create(interval, timeUnit);
+  }
+
+  /**
+   * (Scala-friendly)
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *import scala.concurrent.duration._
+   *df.writeStream.trigger(Trigger.Continuous(10.seconds))
+   * }}}
+   * @since 2.2.0
--- End diff --

2.3.0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157124394
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+buildConf("spark.sql.streaming.continuous.executorQueueSize")
+.internal()
+.doc("The size (measured in number of rows) of the queue used in 
continuous execution to" +
+  " buffer the results of a ContinuousDataReader.")
+.intConf
--- End diff --

`longConf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157124263
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+buildConf("spark.sql.streaming.continuous.executorQueueSize")
+.internal()
+.doc("The size (measured in number of rows) of the queue used in 
continuous execution to" +
+  " buffer the results of a ContinuousDataReader.")
+.intConf
+.createWithDefault(1024)
+
+  val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
+buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
+  .internal()
+  .doc("The interval at which continuous execution readers will poll 
to check whether" +
+" the epoch has advanced on the driver.")
+  .intConf
--- End diff --

`timeConf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19984

[SPARK-22789] Map-only continuous processing execution

## What changes were proposed in this pull request?

Basic continuous execution, supporting map/flatMap/filter, with commits and 
advancement through RPC.

## How was this patch tested?

new unit-ish tests (exercising execution end to end)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark continuous-impl

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19984


commit d6bea84447d910e79d5926972d87a80bc5dc2e2e
Author: Jose Torres 
Date:   2017-12-07T22:08:28Z

Refactor StreamExecution into a parent class so continuous processing can 
extend it

commit df6b8861173d1e7853952c8f3ffe504975efe204
Author: Jose Torres 
Date:   2017-12-12T19:31:28Z

address fmt

commit 6f0ce6b1cf1abf602c2b02ce6d31f46f8fa71b7c
Author: Jose Torres 
Date:   2017-12-13T00:09:48Z

slight changes

commit 2b360ab49bcab3c73ea85ce62202e40e950931ef
Author: Jose Torres 
Date:   2017-12-13T00:10:34Z

rm spurious space

commit 1b19f1cef17e7324997649ad8c5f97887912
Author: Jose Torres 
Date:   2017-12-13T00:35:30Z

fix compile

commit 96eba13be9764e63f3d1375d7b51dbfd0675aa98
Author: Jose Torres 
Date:   2017-12-11T20:48:20Z

harness

commit 2d5efadb9e7662363e3e4a3c66e0f5f73e4935ef
Author: Jose Torres 
Date:   2017-12-11T21:18:25Z

awaitEpoch impl

commit 578bbb7eb0725b795ac65d1beda436515f4f4eba
Author: Jose Torres 
Date:   2017-12-11T21:46:09Z

move local[10] to only continuous suite

commit 9051eff6c88838ac61ab45763ed84d593e2d4837
Author: Jose Torres 
Date:   2017-12-11T21:49:55Z

repeatedly restart

commit 60fa4477591cc264b9ea253f64065d762ce3f96f
Author: Jose Torres 
Date:   2017-12-11T22:02:52Z

fix some simple TODOs

commit ea8e76ec75752d134433730ee1a007cce1fdcfe8
Author: Jose Torres 
Date:   2017-12-11T22:11:18Z

use runId instead of queryId for endpoint name

commit d0f3cc7701d9eb3e7df571561e751f03c0537f3a
Author: Jose Torres 
Date:   2017-12-11T22:19:03Z

more simple todos

commit ba9dbaa1be2f54827a42f3177669082e7d1f99e2
Author: Jose Torres 
Date:   2017-12-11T22:27:12Z

remove old state

commit 2cd005f4685e492ae78d6b9c579c80c2370d2f14
Author: Jose Torres 
Date:   2017-12-11T22:35:51Z

remove clean shutdown workaround in StreamTest

commit a7fa31fb5375074d888bd0a94e317ad3f1692e5a
Author: Jose Torres 
Date:   2017-12-11T22:50:09Z

update ContinuousExecution docs

commit f687432a58acf7337885edfc01adc94188d174d8
Author: Jose Torres 
Date:   2017-12-11T22:59:14Z

add comments to EpochCoordinator

commit 987b011ee78292c3379559910ebe101daf4f9450
Author: Jose Torres 
Date:   2017-12-12T00:02:54Z

change offset semantic to end of previous epoch

commit 5494fc50ef99b3e584c287b03eaa32b30657a5ce
Author: Jose Torres 
Date:   2017-12-12T00:18:40Z

document EpochCoordinator

commit d6ef404b85fa6977b5f38a853dca11de5189b3f9
Author: Jose Torres 
Date:   2017-12-12T02:06:44Z

simplify epoch handling

commit 647bd2745c1c0842002d4f71b61aa34beb0f8b29
Author: Jose Torres 
Date:   2017-12-12T19:17:58Z

stress tests

commit 053a9f349a4829433a495aa5989f1ca1c8a3256e
Author: Jose Torres 
Date:   2017-12-12T20:17:22Z

add minBatchesToRetain

commit 7072d21444388fe167fa7e3475b3e95ec9923d5e
Author: Jose Torres 
Date:   2017-12-12T20:43:33Z

add confs

commit 4083a8f5c6b6ef298726234d54f23a90e971e77e
Author: Jose Torres 
Date:   2017-12-12T21:10:33Z

latency suite not meaningful here

commit 41d391f2027a4e8b3730d15cea7b7fbcdcec27de
Author: Jose Torres 
Date:   2017-12-13T00:04:07Z

more stress::q

commit 402cfa3b10dfb0f37ce8d94336be3b3c01fe9f90
Author: Jose Torres 
Date:   2017-12-13T18:55:23Z

use temp dir

commit e4a1bc19db9ea0233879d270e725ed58d95a34ad
Author: Jose Torres 
Date:   2017-12-14T19:37:36Z

fix against rebase

commit 8887b3c92afe8bb1659f600785af5d97f085f2bb
Author: Jose Torres 
Date:   2017-12-14T21:32:16Z

fix ser/deser

commit 60bf0e33f20134af296d85b5c52729c4063ef2e1
Author: Jose Torres 
Date: