[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156872558
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A {@link DataSourceV2Writer} for use with continuous stream processing.
--- End diff --

`A variation  of ...`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156872002
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.Offset;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ */
+public interface MicroBatchReader extends DataSourceV2Reader, 
BaseStreamingSource {
+/**
+ * Set the desired offset range for read tasks created from this 
reader. Read tasks will
--- End diff --

do you mean this method must be called before `createReadTasks`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156871819
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.Offset;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156871677
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
--- End diff --

It's not a mix-in interface but a variation on `DataSourceV2Reader`, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156871374
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
+
+import java.io.IOException;
+
+/**
+ * A variation on {@link DataReader} for use with streaming in continuous 
processing mode.
+ */
+public interface ContinuousDataReader extends DataReader {
+/**
+ * Get the offset of the current record, or the start offset if no 
records have been read.
+ *
+ * The execution engine will call this method along with get() to keep 
track of the current
--- End diff --

better to use a real java doc link, e.g. `{@link DataReader#get}`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156870726
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+   *can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
--- End diff --

typo: `unique numberic`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19925


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156779552
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.DataSource
+import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousRateStreamPartitionOffset,
 ContinuousRateStreamReader, RateStreamDataReader, RateStreamReadTask}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.streaming.StreamTest
+
+class RateSourceV2Suite extends StreamTest {
+  test("microbatch in registry") {
+DataSource.lookupDataSource("rate", 
spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+val reader = ds.createMicroBatchReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+assert(reader.isInstanceOf[RateStreamV2Reader])
+  case _ =>
+throw new IllegalStateException("Could not find v2 read support 
for rate")
+}
+  }
+
+  test("microbatch - options propagated") {
+val reader = new RateStreamV2Reader(
+  new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" 
-> "33").asJava))
+reader.setOffsetRange(Optional.empty(),
+  Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
+val tasks = reader.createReadTasks()
+assert(tasks.size == 11)
+tasks.asScala.foreach {
+  // for 1 second, size of each task is (rowsPerSecond / numPartitions)
+  case RateStreamBatchTask(vals) => vals.size == 3
+  case _ => throw new IllegalStateException("Unexpected task type")
+}
+  }
+
+  test("microbatch - set offset") {
+val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+reader.setOffsetRange(Optional.of(LongOffset(12345)), 
Optional.of(LongOffset(54321)))
+assert(reader.getStartOffset() == LongOffset(12345))
+assert(reader.getEndOffset() == LongOffset(54321))
+  }
+
+  test("microbatch - infer offsets") {
+val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
+reader.setOffsetRange(Optional.empty(), Optional.empty())
+assert(reader.getStartOffset() == LongOffset(reader.creationTimeMs))
+assert(reader.getEndOffset().asInstanceOf[LongOffset].offset >= 
reader.creationTimeMs + 100)
+  }
+
+
+  test("microbatch - data read") {
+val reader = new RateStreamV2Reader(
+  new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" 
-> "33").asJava))
+reader.setOffsetRange(Optional.empty(),
+  Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
+val tasks = reader.createReadTasks()
+assert(tasks.size == 11)
+
+val readData = tasks.asScala
+  .map(_.createDataReader())
+  .flatMap { reader =>
+val buf = scala.collection.mutable.ListBuffer[Row]()
+while (reader.next()) buf.append(reader.get())
+buf
+  }
+
+assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
+  }
+
+  test("continuous in registry") {
+DataSource.lookupDataSource("rate", 
spark.sqlContext.conf).newInstance() match {
+  case ds: ContinuousReadSupport =>
+val reader = ds.createContinuousReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+assert(reader.isInstanceOf[ContinuousRateStreamReader])
+  case _ =>
+throw new IllegalStateException("Could not find v2 read support 
for rate")
+}
+  }
+
+  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156779156
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.DataSource
+import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousRateStreamPartitionOffset,
 ContinuousRateStreamReader, RateStreamDataReader, RateStreamReadTask}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.streaming.StreamTest
+
+class RateSourceV2Suite extends StreamTest {
+  test("microbatch in registry") {
+DataSource.lookupDataSource("rate", 
spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+val reader = ds.createMicroBatchReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+assert(reader.isInstanceOf[RateStreamV2Reader])
+  case _ =>
+throw new IllegalStateException("Could not find v2 read support 
for rate")
+}
+  }
+
+  test("microbatch - options propagated") {
+val reader = new RateStreamV2Reader(
+  new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" 
-> "33").asJava))
+reader.setOffsetRange(Optional.empty(),
+  Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
+val tasks = reader.createReadTasks()
+assert(tasks.size == 11)
+tasks.asScala.foreach {
+  // for 1 second, size of each task is (rowsPerSecond / numPartitions)
+  case RateStreamBatchTask(vals) => vals.size == 3
+  case _ => throw new IllegalStateException("Unexpected task type")
+}
+  }
+
+  test("microbatch - set offset") {
+val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+reader.setOffsetRange(Optional.of(LongOffset(12345)), 
Optional.of(LongOffset(54321)))
+assert(reader.getStartOffset() == LongOffset(12345))
+assert(reader.getEndOffset() == LongOffset(54321))
+  }
+
+  test("microbatch - infer offsets") {
+val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
+reader.setOffsetRange(Optional.empty(), Optional.empty())
+assert(reader.getStartOffset() == LongOffset(reader.creationTimeMs))
+assert(reader.getEndOffset().asInstanceOf[LongOffset].offset >= 
reader.creationTimeMs + 100)
+  }
+
+
+  test("microbatch - data read") {
+val reader = new RateStreamV2Reader(
+  new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" 
-> "33").asJava))
+reader.setOffsetRange(Optional.empty(),
+  Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
--- End diff --

this will be flaky when the Jenkins is overload. Could you add a clock 
parameter to  RateStreamV2Reader and use `ManualClock` instead so that we can 
control the clock? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156741131
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class 

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156740666
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala 
---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import 
org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, 
Update}
+import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, 
DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
+ * tests and does not provide durability.
+ */
+class MemorySinkV2 extends DataSourceV2
+  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
+
+  override def createMicroBatchWriter(
+  queryId: String,
+  batchId: Long,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): 
java.util.Optional[DataSourceV2Writer] = {
+java.util.Optional.of(new MemoryWriter(this, batchId, mode))
+  }
+
+  override def createContinuousWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): java.util.Optional[ContinuousWriter] 
= {
+java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+  }
+
+  private case class AddedData(batchId: Long, data: Array[Row])
+
+  /** An order list of batches that have been written to this [[Sink]]. */
+  @GuardedBy("this")
+  private val batches = new ArrayBuffer[AddedData]()
+
+  /** Returns all rows that are stored in this [[Sink]]. */
+  def allData: Seq[Row] = synchronized {
+batches.flatMap(_.data)
+  }
+
+  def latestBatchId: Option[Long] = synchronized {
+batches.lastOption.map(_.batchId)
+  }
+
+  def latestBatchData: Seq[Row] = synchronized {
+batches.lastOption.toSeq.flatten(_.data)
+  }
+
+  def toDebugString: String = synchronized {
+batches.map { case AddedData(batchId, data) =>
+  val dataStr = try data.mkString(" ") catch {
+case NonFatal(e) => "[Error converting to string]"
+  }
+  s"$batchId: $dataStr"
+}.mkString("\n")
+  }
+
+  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): 
Unit = {
+val notCommitted = synchronized {
+  latestBatchId.isEmpty || batchId > latestBatchId.get
+}
+if (notCommitted) {
+  logDebug(s"Committing batch $batchId to $this")
+  outputMode match {
+case Append | Update =>
+  val rows = AddedData(batchId, newRows)
+  synchronized { batches += rows }
+
+case Complete =>
+  val rows = AddedData(batchId, newRows)
+  synchronized {
+batches.clear()
+batches += rows
+  }
+
+case _ =>
+  throw new IllegalArgumentException(
+s"Output mode $outputMode is not supported by MemorySink")
+  }
+} else {
+  logDebug(s"Skipping already committed batch: $batchId")
+}
+  }
+
+  def clear(): Unit = synchronized {
+batches.clear()
+  }
+
+  override def toString(): String = "MemorySink"
+}
+
+case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) 
extends WriterCommitMessage {}
+
+class MemoryWriter(sink: 

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156542366
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class 

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156540689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class 

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156540301
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class RateStreamReadTask(
  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156539696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class 

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156524302
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+/**
+ * Used for per-partition offsets in continuous processing. 
ContinuousReader implementations will
+ * provide a method to merge these into a global Offset.
+ *
+ * These offsets must be serializable.
+ */
+public interface PartitionOffset {
--- End diff --

`extends Serializable`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156535101
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class RateStreamReadTask(
  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156526139
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+public abstract class Offset {
--- End diff --

nit: add javadoc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156535839
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala 
---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import 
org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, 
Update}
+import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, 
DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
+ * tests and does not provide durability.
+ */
+class MemorySinkV2 extends DataSourceV2
+  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
+
+  override def createMicroBatchWriter(
+  queryId: String,
+  batchId: Long,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): 
java.util.Optional[DataSourceV2Writer] = {
+java.util.Optional.of(new MemoryWriter(this, batchId, mode))
+  }
+
+  override def createContinuousWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): java.util.Optional[ContinuousWriter] 
= {
+java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+  }
+
+  private case class AddedData(batchId: Long, data: Array[Row])
+
+  /** An order list of batches that have been written to this [[Sink]]. */
+  @GuardedBy("this")
+  private val batches = new ArrayBuffer[AddedData]()
+
+  /** Returns all rows that are stored in this [[Sink]]. */
+  def allData: Seq[Row] = synchronized {
+batches.flatMap(_.data)
+  }
+
+  def latestBatchId: Option[Long] = synchronized {
+batches.lastOption.map(_.batchId)
+  }
+
+  def latestBatchData: Seq[Row] = synchronized {
+batches.lastOption.toSeq.flatten(_.data)
+  }
+
+  def toDebugString: String = synchronized {
+batches.map { case AddedData(batchId, data) =>
+  val dataStr = try data.mkString(" ") catch {
+case NonFatal(e) => "[Error converting to string]"
+  }
+  s"$batchId: $dataStr"
+}.mkString("\n")
+  }
+
+  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): 
Unit = {
+val notCommitted = synchronized {
+  latestBatchId.isEmpty || batchId > latestBatchId.get
+}
+if (notCommitted) {
+  logDebug(s"Committing batch $batchId to $this")
+  outputMode match {
+case Append | Update =>
+  val rows = AddedData(batchId, newRows)
+  synchronized { batches += rows }
+
+case Complete =>
+  val rows = AddedData(batchId, newRows)
+  synchronized {
+batches.clear()
+batches += rows
+  }
+
+case _ =>
+  throw new IllegalArgumentException(
+s"Output mode $outputMode is not supported by MemorySink")
+  }
+} else {
+  logDebug(s"Skipping already committed batch: $batchId")
+}
+  }
+
+  def clear(): Unit = synchronized {
+batches.clear()
+  }
+
+  override def toString(): String = "MemorySink"
+}
+
+case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) 
extends WriterCommitMessage {}
+
+class MemoryWriter(sink: MemorySinkV2, 

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156532274
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
--- End diff --

Could you make the error message more clear? Such as, `The previous run has 
XXX partitions but the new one has XXX partitions. The "numPartitions" option 
must not be changed.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156530920
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
--- End diff --

A v2 reader should never see `SerializedOffset`. Right? The engine is 
supposed to call `deserializeOffset` to get the real offset object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156530584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
--- End diff --

nit: You can reuse 
`org.apache.spark.sql.execution.streaming.RateSourceProvider.SCHEMA`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156533755
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+
+}
+
+case class RateStreamReadTask(
  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156531367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
--- End diff --

`_.size` != `numPartitions `. Changing the `numPartitions` will generate 
either duplicated data or discontinuous data. Right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-08 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r155867163
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+   *can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
+   *incrementing counter representing a consistent set of 
data; the same batch may
+   *be started multiple times in failure recovery 
scenarios, but it will always
+   *contain the same records.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive batch 
output means to this
+   * source, please refer to {@link OutputMode} for more 
details.
--- End diff --

Good point. Fixed here and in ContinuousWriteSupport.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-08 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r155834182
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+   *can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
+   *incrementing counter representing a consistent set of 
data; the same batch may
+   *be started multiple times in failure recovery 
scenarios, but it will always
+   *contain the same records.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive batch 
output means to this
+   * source, please refer to {@link OutputMode} for more 
details.
--- End diff --

to this **sink**? not source


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-07 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19925

[SPARK-22732] Add Structured Streaming APIs to DataSourceV2

## What changes were proposed in this pull request?

This PR provides DataSourceV2 API support for structured streaming, 
including new pieces needed to support continuous processing [SPARK-20928]. 
High level summary:

- DataSourceV2 includes new mixins to support micro-batch and continuous 
reads and writes. For reads, we accept an optional user specified schema rather 
than using the ReadSupportWithSchema model, because doing so would severely 
complicate the interface.

- DataSourceV2Reader includes new interfaces to read a specific microbatch 
or read continuously from a given offset. These follow the same setter pattern 
as the existing Supports* mixins so that they can work with 
SupportsScanUnsafeRow.

- DataReader (the per-partition reader) has a new subinterface 
ContinuousDataReader only for continuous processing. This reader has a special 
method to check progress, and next() blocks for new input rather than returning 
false.

- Offset, an abstract representation of position in a streaming query, is 
ported to the public API. (Each type of reader will define its own Offset 
implementation.)

- DataSourceV2Writer has a new subinterface ContinuousWriter only for 
continuous processing. Commits to this interface come tagged with an epoch 
number, as the execution engine will continue to produce new epoch commits as 
the task continues indefinitely.

Note that this PR does not propose to change the existing DataSourceV2 
batch API, or deprecate the existing streaming source/sink internal APIs in 
spark.sql.execution.streaming.

## How was this patch tested?

Toy implementations of the new interfaces with unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark continuous-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19925


commit daa3a78ad4dd7ecfc73f5b1dd050388c07b42771
Author: Jose Torres 
Date:   2017-12-05T18:48:20Z

add tests

commit edae89508ec2bf02fba00a264cb774b0d60fb068
Author: Jose Torres 
Date:   2017-12-05T19:35:36Z

writer impl

commit 9b28c524b343018d20d2d8d3c9ed4d3c530c413f
Author: Jose Torres 
Date:   2017-12-05T19:37:24Z

rm useless writer

commit 7ceda9d63b9914cfd275fc4240fa9c696afa05d1
Author: Jose Torres 
Date:   2017-12-05T21:02:32Z

rm weird docs

commit ff7be6914560968af7f2179c3704446c771fad52
Author: Jose Torres 
Date:   2017-12-05T21:59:50Z

shuffle around public interfaces

commit 4ae516a61af903c37b748a3941c2472d20776ce4
Author: Jose Torres 
Date:   2017-12-05T22:02:01Z

fix imports

commit a8ff2ee9eeb992f6c0806cb2b4f33b976ef51cf5
Author: Jose Torres 
Date:   2017-12-05T22:40:15Z

put deserialize in reader so we don't have to port SerializedOffset

commit 5096d3d551aa4479bfb112b286683e28ec578f3c
Author: Jose Torres 
Date:   2017-12-05T23:51:08Z

off by one errors grr

commit da00f6b5ddac8bd6025076a67fd4716d9d070bf7
Author: Jose Torres 
Date:   2017-12-05T23:55:58Z

document right semantics

commit 1526f433837de78f59009b6632b6920de38bb1b0
Author: Jose Torres 
Date:   2017-12-06T00:08:54Z

document checkpoint location

commit 33b619ca4f9aa1a82e3830c6e485b8298ca9ff50
Author: Jose Torres 
Date:   2017-12-06T00:43:36Z

add getStart to continuous and clarify semantics

commit 083b04004f58358b3f6e4c82b4690ca5cf2da764
Author: Jose Torres 
Date:   2017-12-06T17:23:34Z

cleanup offset set/get docs

commit 4d6244d2ae431f6043de97f322ce1c33090c
Author: Jose Torres 
Date:   2017-12-06T17:32:45Z

cleanup reader docs

commit 5f9df4f1b54cbd0570d0df5567c42ac2575009a5
Author: Jose Torres 
Date:   2017-12-06T18:06:44Z

explain getOffset

commit a2323e95ff2d407877ded07b7537bac5b63dda8f
Author: Jose Torres 
Date:   2017-12-06T21:17:43Z

fix fmt

commit b80c75cd698cbe4840445efb78a662f02f355a99
Author: Jose Torres 
Date:   2017-12-06T21:24:35Z

fix doc

commit 03bd69da4b0450e5fec88f4196998e3075e98edc
Author: Jose Torres 
Date:   2017-12-06T21:39:20Z

note interfaces are temporary

commit c7bc6a37914312666259bb9724aa7103926e4c0f
Author: Jose Torres 
Date:   2017-12-06T21:43:38Z

fix wording

commit