[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156872558 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A {@link DataSourceV2Writer} for use with continuous stream processing. --- End diff -- `A variation of ...`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156872002 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.Offset; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to indicate they allow micro-batch streaming reads. + */ +public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource { +/** + * Set the desired offset range for read tasks created from this reader. Read tasks will --- End diff -- do you mean this method must be called before `createReadTasks`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156871819 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.Offset; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156871677 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.PartitionOffset; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this --- End diff -- It's not a mix-in interface but a variation on `DataSourceV2Reader`, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156871374 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.PartitionOffset; + +import java.io.IOException; + +/** + * A variation on {@link DataReader} for use with streaming in continuous processing mode. + */ +public interface ContinuousDataReader extends DataReader { +/** + * Get the offset of the current record, or the start offset if no records have been read. + * + * The execution engine will call this method along with get() to keep track of the current --- End diff -- better to use a real java doc link, e.g. `{@link DataReader#get}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156870726 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability and save the data from a microbatch to the data source. + */ +@InterfaceStability.Evolving +public interface MicroBatchWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + *queries running at the same time, and the returned {@link DataSourceV2Writer} + *can use this id to distinguish itself from others. + * @param epochId The uniquenumeric ID of the batch within this writing query. This is an --- End diff -- typo: `unique numberic` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19925 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156779552 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.Optional + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.continuous.{ContinuousRateStreamPartitionOffset, ContinuousRateStreamReader, RateStreamDataReader, RateStreamReadTask} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport} +import org.apache.spark.sql.streaming.StreamTest + +class RateSourceV2Suite extends StreamTest { + test("microbatch in registry") { +DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty()) +assert(reader.isInstanceOf[RateStreamV2Reader]) + case _ => +throw new IllegalStateException("Could not find v2 read support for rate") +} + } + + test("microbatch - options propagated") { +val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) +reader.setOffsetRange(Optional.empty(), + Optional.of(LongOffset(System.currentTimeMillis() + 1001))) +val tasks = reader.createReadTasks() +assert(tasks.size == 11) +tasks.asScala.foreach { + // for 1 second, size of each task is (rowsPerSecond / numPartitions) + case RateStreamBatchTask(vals) => vals.size == 3 + case _ => throw new IllegalStateException("Unexpected task type") +} + } + + test("microbatch - set offset") { +val reader = new RateStreamV2Reader(DataSourceV2Options.empty()) +reader.setOffsetRange(Optional.of(LongOffset(12345)), Optional.of(LongOffset(54321))) +assert(reader.getStartOffset() == LongOffset(12345)) +assert(reader.getEndOffset() == LongOffset(54321)) + } + + test("microbatch - infer offsets") { +val reader = new RateStreamV2Reader(DataSourceV2Options.empty()) +reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100) +reader.setOffsetRange(Optional.empty(), Optional.empty()) +assert(reader.getStartOffset() == LongOffset(reader.creationTimeMs)) +assert(reader.getEndOffset().asInstanceOf[LongOffset].offset >= reader.creationTimeMs + 100) + } + + + test("microbatch - data read") { +val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) +reader.setOffsetRange(Optional.empty(), + Optional.of(LongOffset(System.currentTimeMillis() + 1001))) +val tasks = reader.createReadTasks() +assert(tasks.size == 11) + +val readData = tasks.asScala + .map(_.createDataReader()) + .flatMap { reader => +val buf = scala.collection.mutable.ListBuffer[Row]() +while (reader.next()) buf.append(reader.get()) +buf + } + +assert(readData.map(_.getLong(1)).sorted == Range(0, 33)) + } + + test("continuous in registry") { +DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { + case ds: ContinuousReadSupport => +val reader = ds.createContinuousReader(Optional.empty(), "", DataSourceV2Options.empty()) +assert(reader.isInstanceOf[ContinuousRateStreamReader]) + case _ => +throw new IllegalStateException("Could not find v2 read support for rate") +} + } + +
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156779156 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.Optional + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.continuous.{ContinuousRateStreamPartitionOffset, ContinuousRateStreamReader, RateStreamDataReader, RateStreamReadTask} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport} +import org.apache.spark.sql.streaming.StreamTest + +class RateSourceV2Suite extends StreamTest { + test("microbatch in registry") { +DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty()) +assert(reader.isInstanceOf[RateStreamV2Reader]) + case _ => +throw new IllegalStateException("Could not find v2 read support for rate") +} + } + + test("microbatch - options propagated") { +val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) +reader.setOffsetRange(Optional.empty(), + Optional.of(LongOffset(System.currentTimeMillis() + 1001))) +val tasks = reader.createReadTasks() +assert(tasks.size == 11) +tasks.asScala.foreach { + // for 1 second, size of each task is (rowsPerSecond / numPartitions) + case RateStreamBatchTask(vals) => vals.size == 3 + case _ => throw new IllegalStateException("Unexpected task type") +} + } + + test("microbatch - set offset") { +val reader = new RateStreamV2Reader(DataSourceV2Options.empty()) +reader.setOffsetRange(Optional.of(LongOffset(12345)), Optional.of(LongOffset(54321))) +assert(reader.getStartOffset() == LongOffset(12345)) +assert(reader.getEndOffset() == LongOffset(54321)) + } + + test("microbatch - infer offsets") { +val reader = new RateStreamV2Reader(DataSourceV2Options.empty()) +reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100) +reader.setOffsetRange(Optional.empty(), Optional.empty()) +assert(reader.getStartOffset() == LongOffset(reader.creationTimeMs)) +assert(reader.getEndOffset().asInstanceOf[LongOffset].offset >= reader.creationTimeMs + 100) + } + + + test("microbatch - data read") { +val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) +reader.setOffsetRange(Optional.empty(), + Optional.of(LongOffset(System.currentTimeMillis() + 1001))) --- End diff -- this will be flaky when the Jenkins is overload. Could you add a clock parameter to RateStreamV2Reader and use `ManualClock` instead so that we can control the clock? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156741131 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156740666 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit + * tests and does not provide durability. + */ +class MemorySinkV2 extends DataSourceV2 + with MicroBatchWriteSupport with ContinuousWriteSupport with Logging { + + override def createMicroBatchWriter( + queryId: String, + batchId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = { +java.util.Optional.of(new MemoryWriter(this, batchId, mode)) + } + + override def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { +java.util.Optional.of(new ContinuousMemoryWriter(this, mode)) + } + + private case class AddedData(batchId: Long, data: Array[Row]) + + /** An order list of batches that have been written to this [[Sink]]. */ + @GuardedBy("this") + private val batches = new ArrayBuffer[AddedData]() + + /** Returns all rows that are stored in this [[Sink]]. */ + def allData: Seq[Row] = synchronized { +batches.flatMap(_.data) + } + + def latestBatchId: Option[Long] = synchronized { +batches.lastOption.map(_.batchId) + } + + def latestBatchData: Seq[Row] = synchronized { +batches.lastOption.toSeq.flatten(_.data) + } + + def toDebugString: String = synchronized { +batches.map { case AddedData(batchId, data) => + val dataStr = try data.mkString(" ") catch { +case NonFatal(e) => "[Error converting to string]" + } + s"$batchId: $dataStr" +}.mkString("\n") + } + + def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { +val notCommitted = synchronized { + latestBatchId.isEmpty || batchId > latestBatchId.get +} +if (notCommitted) { + logDebug(s"Committing batch $batchId to $this") + outputMode match { +case Append | Update => + val rows = AddedData(batchId, newRows) + synchronized { batches += rows } + +case Complete => + val rows = AddedData(batchId, newRows) + synchronized { +batches.clear() +batches += rows + } + +case _ => + throw new IllegalArgumentException( +s"Output mode $outputMode is not supported by MemorySink") + } +} else { + logDebug(s"Skipping already committed batch: $batchId") +} + } + + def clear(): Unit = synchronized { +batches.clear() + } + + override def toString(): String = "MemorySink" +} + +case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} + +class MemoryWriter(sink:
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156542366 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156540689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156540301 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class RateStreamReadTask(
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156539696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156524302 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * Used for per-partition offsets in continuous processing. ContinuousReader implementations will + * provide a method to merge these into a global Offset. + * + * These offsets must be serializable. + */ +public interface PartitionOffset { --- End diff -- `extends Serializable` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156535101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class RateStreamReadTask(
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156526139 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +public abstract class Offset { --- End diff -- nit: add javadoc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156535839 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit + * tests and does not provide durability. + */ +class MemorySinkV2 extends DataSourceV2 + with MicroBatchWriteSupport with ContinuousWriteSupport with Logging { + + override def createMicroBatchWriter( + queryId: String, + batchId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = { +java.util.Optional.of(new MemoryWriter(this, batchId, mode)) + } + + override def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { +java.util.Optional.of(new ContinuousMemoryWriter(this, mode)) + } + + private case class AddedData(batchId: Long, data: Array[Row]) + + /** An order list of batches that have been written to this [[Sink]]. */ + @GuardedBy("this") + private val batches = new ArrayBuffer[AddedData]() + + /** Returns all rows that are stored in this [[Sink]]. */ + def allData: Seq[Row] = synchronized { +batches.flatMap(_.data) + } + + def latestBatchId: Option[Long] = synchronized { +batches.lastOption.map(_.batchId) + } + + def latestBatchData: Seq[Row] = synchronized { +batches.lastOption.toSeq.flatten(_.data) + } + + def toDebugString: String = synchronized { +batches.map { case AddedData(batchId, data) => + val dataStr = try data.mkString(" ") catch { +case NonFatal(e) => "[Error converting to string]" + } + s"$batchId: $dataStr" +}.mkString("\n") + } + + def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { +val notCommitted = synchronized { + latestBatchId.isEmpty || batchId > latestBatchId.get +} +if (notCommitted) { + logDebug(s"Committing batch $batchId to $this") + outputMode match { +case Append | Update => + val rows = AddedData(batchId, newRows) + synchronized { batches += rows } + +case Complete => + val rows = AddedData(batchId, newRows) + synchronized { +batches.clear() +batches += rows + } + +case _ => + throw new IllegalArgumentException( +s"Output mode $outputMode is not supported by MemorySink") + } +} else { + logDebug(s"Skipping already committed batch: $batchId") +} + } + + def clear(): Unit = synchronized { +batches.clear() + } + + override def toString(): String = "MemorySink" +} + +case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} + +class MemoryWriter(sink: MemorySinkV2,
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156532274 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") --- End diff -- Could you make the error message more clear? Such as, `The previous run has XXX partitions but the new one has XXX partitions. The "numPartitions" option must not be changed.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156530920 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) --- End diff -- A v2 reader should never see `SerializedOffset`. Right? The engine is supposed to call `deserializeOffset` to get the real offset object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156530584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( --- End diff -- nit: You can reuse `org.apache.spark.sql.execution.streaming.RateSourceProvider.SCHEMA` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156533755 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class RateStreamReadTask(
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156531367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { --- End diff -- `_.size` != `numPartitions `. Changing the `numPartitions` will generate either duplicated data or discontinuous data. Right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r155867163 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability and save the data from a microbatch to the data source. + */ +@InterfaceStability.Evolving +public interface MicroBatchWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + *queries running at the same time, and the returned {@link DataSourceV2Writer} + *can use this id to distinguish itself from others. + * @param epochId The uniquenumeric ID of the batch within this writing query. This is an + *incrementing counter representing a consistent set of data; the same batch may + *be started multiple times in failure recovery scenarios, but it will always + *contain the same records. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive batch output means to this + * source, please refer to {@link OutputMode} for more details. --- End diff -- Good point. Fixed here and in ContinuousWriteSupport. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r155834182 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability and save the data from a microbatch to the data source. + */ +@InterfaceStability.Evolving +public interface MicroBatchWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + *queries running at the same time, and the returned {@link DataSourceV2Writer} + *can use this id to distinguish itself from others. + * @param epochId The uniquenumeric ID of the batch within this writing query. This is an + *incrementing counter representing a consistent set of data; the same batch may + *be started multiple times in failure recovery scenarios, but it will always + *contain the same records. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive batch output means to this + * source, please refer to {@link OutputMode} for more details. --- End diff -- to this **sink**? not source --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19925 [SPARK-22732] Add Structured Streaming APIs to DataSourceV2 ## What changes were proposed in this pull request? This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary: - DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface. - DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow. - DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false. - Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.) - DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely. Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming. ## How was this patch tested? Toy implementations of the new interfaces with unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark continuous-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19925 commit daa3a78ad4dd7ecfc73f5b1dd050388c07b42771 Author: Jose TorresDate: 2017-12-05T18:48:20Z add tests commit edae89508ec2bf02fba00a264cb774b0d60fb068 Author: Jose Torres Date: 2017-12-05T19:35:36Z writer impl commit 9b28c524b343018d20d2d8d3c9ed4d3c530c413f Author: Jose Torres Date: 2017-12-05T19:37:24Z rm useless writer commit 7ceda9d63b9914cfd275fc4240fa9c696afa05d1 Author: Jose Torres Date: 2017-12-05T21:02:32Z rm weird docs commit ff7be6914560968af7f2179c3704446c771fad52 Author: Jose Torres Date: 2017-12-05T21:59:50Z shuffle around public interfaces commit 4ae516a61af903c37b748a3941c2472d20776ce4 Author: Jose Torres Date: 2017-12-05T22:02:01Z fix imports commit a8ff2ee9eeb992f6c0806cb2b4f33b976ef51cf5 Author: Jose Torres Date: 2017-12-05T22:40:15Z put deserialize in reader so we don't have to port SerializedOffset commit 5096d3d551aa4479bfb112b286683e28ec578f3c Author: Jose Torres Date: 2017-12-05T23:51:08Z off by one errors grr commit da00f6b5ddac8bd6025076a67fd4716d9d070bf7 Author: Jose Torres Date: 2017-12-05T23:55:58Z document right semantics commit 1526f433837de78f59009b6632b6920de38bb1b0 Author: Jose Torres Date: 2017-12-06T00:08:54Z document checkpoint location commit 33b619ca4f9aa1a82e3830c6e485b8298ca9ff50 Author: Jose Torres Date: 2017-12-06T00:43:36Z add getStart to continuous and clarify semantics commit 083b04004f58358b3f6e4c82b4690ca5cf2da764 Author: Jose Torres Date: 2017-12-06T17:23:34Z cleanup offset set/get docs commit 4d6244d2ae431f6043de97f322ce1c33090c Author: Jose Torres Date: 2017-12-06T17:32:45Z cleanup reader docs commit 5f9df4f1b54cbd0570d0df5567c42ac2575009a5 Author: Jose Torres Date: 2017-12-06T18:06:44Z explain getOffset commit a2323e95ff2d407877ded07b7537bac5b63dda8f Author: Jose Torres Date: 2017-12-06T21:17:43Z fix fmt commit b80c75cd698cbe4840445efb78a662f02f355a99 Author: Jose Torres Date: 2017-12-06T21:24:35Z fix doc commit 03bd69da4b0450e5fec88f4196998e3075e98edc Author: Jose Torres Date: 2017-12-06T21:39:20Z note interfaces are temporary commit c7bc6a37914312666259bb9724aa7103926e4c0f Author: Jose Torres Date: 2017-12-06T21:43:38Z fix wording commit