Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156530920 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { + assert(offsets.length == numPartitions) + val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s + } + ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { + ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { + StructType( + StructField("timestamp", TimestampType, false) :: + StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { + this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { + val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) --- End diff -- A v2 reader should never see `SerializedOffset`. Right? The engine is supposed to call `deserializeOffset` to get the real offset object.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org