[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r224755348 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala --- @@ -39,7 +39,22 @@ case class SparkListenerSQLExecutionStart( @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) - extends SparkListenerEvent + extends SparkListenerEvent { + + // The name of the execution, e.g. `df.collect` will trigger a SQL execution with name "collect". + @JsonIgnore private[sql] var executionName: Option[String] = None + + // The following 3 fields are only accessed when `executionName` is defined. + + // The duration of the SQL execution, in nanoseconds. + @JsonIgnore private[sql] var duration: Long = 0L --- End diff -- did you verify that the JsonIgnore annotation actually works? For some reason, I actually needed to annotate the class as ```scala @JsonIgnoreProperties(Array("a", b", "c")) class SomeClass { @JsonProperty("a") val a: ... @JsonProperty("b") val a: ... } ``` the reason being Json4s understands that API better. I believe we use Json4s for all of these events --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/22674 I would just up the timeout in that suite. Now that we're pushing a bunch more stuff to the LiveListenerBus, it may not be draining quickly enough. On slow jenkins' it could likely cause flakiness. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r224013755 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3356,21 +3356,11 @@ class Dataset[T] private[sql]( * user-registered callback functions. */ private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { -try { - qe.executedPlan.foreach { plan => -plan.resetMetrics() - } - val start = System.nanoTime() - val result = SQLExecution.withNewExecutionId(sparkSession, qe) { -action(qe.executedPlan) - } - val end = System.nanoTime() - sparkSession.listenerManager.onSuccess(name, qe, end - start) - result -} catch { - case e: Exception => -sparkSession.listenerManager.onFailure(name, qe, e) -throw e +qe.executedPlan.foreach { plan => --- End diff -- can't executedPlan throw an exception? I thought it can if the original spark plan failed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r224006828 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala --- @@ -75,95 +76,74 @@ trait QueryExecutionListener { */ @Experimental @InterfaceStability.Evolving -class ExecutionListenerManager private extends Logging { - - private[sql] def this(conf: SparkConf) = { -this() +// The `session` is used to indicate which session carries this listener manager, and we only +// catch SQL executions which are launched by the same session. +// The `loadExtensions` flag is used to indicate whether we should load the pre-defined, +// user-specified listeners during construction. We should not do it when cloning this listener +// manager, as we will copy all listeners to the cloned listener manager. +class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) + extends SparkListener with Logging { + + private[this] val listeners = new CopyOnWriteArrayList[QueryExecutionListener] + + if (loadExtensions) { +val conf = session.sparkContext.conf conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames => Utils.loadExtensions(classOf[QueryExecutionListener], classNames, conf).foreach(register) } } + session.sparkContext.listenerBus.addToSharedQueue(this) + /** * Registers the specified [[QueryExecutionListener]]. */ @DeveloperApi - def register(listener: QueryExecutionListener): Unit = writeLock { -listeners += listener + def register(listener: QueryExecutionListener): Unit = { +listeners.add(listener) } /** * Unregisters the specified [[QueryExecutionListener]]. */ @DeveloperApi - def unregister(listener: QueryExecutionListener): Unit = writeLock { -listeners -= listener + def unregister(listener: QueryExecutionListener): Unit = { +listeners.remove(listener) } /** * Removes all the registered [[QueryExecutionListener]]. */ @DeveloperApi - def clear(): Unit = writeLock { + def clear(): Unit = { listeners.clear() } /** * Get an identical copy of this listener manager. */ - @DeveloperApi - override def clone(): ExecutionListenerManager = writeLock { -val newListenerManager = new ExecutionListenerManager -listeners.foreach(newListenerManager.register) + private[sql] def clone(session: SparkSession): ExecutionListenerManager = { +val newListenerManager = new ExecutionListenerManager(session, loadExtensions = false) +listeners.iterator().asScala.foreach(newListenerManager.register) newListenerManager } - private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { -readLock { - withErrorHandling { listener => -listener.onSuccess(funcName, qe, duration) + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { +case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) => + val funcName = e.executionName.get + e.executionFailure match { +case Some(ex) => + listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, ex)) +case _ => + listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, e.duration)) } -} - } - private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { -readLock { - withErrorHandling { listener => -listener.onFailure(funcName, qe, exception) - } -} +case _ => // Ignore } - private[this] val listeners = ListBuffer.empty[QueryExecutionListener] - - /** A lock to prevent updating the list of listeners while we are traversing through them. */ - private[this] val lock = new ReentrantReadWriteLock() - - private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { -for (listener <- listeners) { - try { -f(listener) - } catch { -case NonFatal(e) => logWarning("Error executing query execution listener", e) - } -} - } - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { -val rl = lock.readLock() -rl.lock() -try f finally { - rl.unlock() -} - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { -val wl = lock
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r224000145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3356,21 +3356,11 @@ class Dataset[T] private[sql]( * user-registered callback functions. */ private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { -try { - qe.executedPlan.foreach { plan => -plan.resetMetrics() - } - val start = System.nanoTime() - val result = SQLExecution.withNewExecutionId(sparkSession, qe) { -action(qe.executedPlan) - } - val end = System.nanoTime() - sparkSession.listenerManager.onSuccess(name, qe, end - start) - result -} catch { - case e: Exception => -sparkSession.listenerManager.onFailure(name, qe, e) -throw e +qe.executedPlan.foreach { plan => --- End diff -- can this throw an exception? Imagine if `df.count()` threw an exception, and then you run it again. Won't this be a behavior change in that case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r224000809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala --- @@ -71,14 +72,35 @@ object SQLExecution { val callSite = sc.getCallSite() withSQLConfPropagated(sparkSession) { -sc.listenerBus.post(SparkListenerSQLExecutionStart( - executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) +var ex: Option[Exception] = None +val startTime = System.currentTimeMillis() try { + sc.listenerBus.post(SparkListenerSQLExecutionStart( +executionId = executionId, +description = callSite.shortForm, +details = callSite.longForm, +physicalPlanDescription = queryExecution.toString, +// `queryExecution.executedPlan` triggers query planning. If it fails, the exception +// will be caught and reported in the `SparkListenerSQLExecutionEnd` +sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), +time = startTime)) body +} catch { + case e: Exception => +ex = Some(e) +throw e } finally { - sc.listenerBus.post(SparkListenerSQLExecutionEnd( -executionId, System.currentTimeMillis())) + val endTime = System.currentTimeMillis() + val event = SparkListenerSQLExecutionEnd(executionId, endTime) + // Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name` + // parameter. The `ExecutionListenerManager` only watches SQL executions with name. We + // can specify the execution name in more places in the future, so that + // `QueryExecutionListener` can track more cases. + event.executionName = name + event.duration = endTime - startTime --- End diff -- duration used to be reported in nanos. Now it's millis. I would still report it as nanos if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22478: [SPARK-25472][SS] Don't have legitimate stops of streams...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/22478 thanks! merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22478: [SPARK-25472] Don't have legitimate stops of stre...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/22478 [SPARK-25472] Don't have legitimate stops of streams cause stream exceptions ## What changes were proposed in this pull request? Legitimate stops of streams may actually cause an exception to be captured by stream execution, because the job throws a SparkException regarding job cancellation during a stop. This PR makes the stop more graceful by swallowing this cancellation error. ## How was this patch tested? This is pretty hard to test. The existing tests should make sure that we're not swallowing other specific SparkExceptions. I've also run the `KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, whereas it used to be flaky. You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark SPARK-25472 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22478 commit dcfc20b291affa6dcc396cdca678ce1f52184dce Author: Burak Yavuz Date: 2018-09-19T23:43:57Z Don't have legitimate stops of streams cause stream exceptions commit 3b8addb9cf02489978594505470fdd527a35c2a7 Author: Burak Yavuz Date: 2018-09-19T23:46:47Z fix scalastyle --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22478: [SPARK-25472] Don't have legitimate stops of streams cau...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/22478 cc @zsxwing @jose-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20673 After benchmarking, observed that this didn't provide much benefit :( Closing the PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user brkyvz closed the pull request at: https://github.com/apache/spark/pull/20673 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22210: [SPARK-25218][Core]Fix potential resource leaks in Trans...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/22210 LGTM! Good catches --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r211031056 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends Logging { /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { +// Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is +// created. +val exception = new SparkException("It was created at: ") +leakDetector = ShutdownHookManager.addShutdownHook { () => + logError("Found a leak KafkaTestUtils.", exception) +} + setupEmbeddedZookeeper() setupEmbeddedKafkaServer() } /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { -// There is a race condition that may kill JVM when terminating the Kafka cluster. We set -// a custom Procedure here during the termination in order to keep JVM running and not fail the -// tests. -val logExitEvent = new Exit.Procedure { - override def execute(statusCode: Int, message: String): Unit = { -logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") - } +if (leakDetector != null) { + ShutdownHookManager.removeShutdownHook(leakDetector) } -Exit.setExitProcedure(logExitEvent) -Exit.setHaltProcedure(logExitEvent) -try { - brokerReady = false - zkReady = false - - if (producer != null) { -producer.close() -producer = null - } +brokerReady = false --- End diff -- do these need to be thread safe? Is boot up, and cleanup serial? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r211030720 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -130,6 +130,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { +// Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is +// created. +val exception = new SparkException("It was created at: ") --- End diff -- nice trick --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/21559 Thanks! Merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195798990 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow]) } /** A common trait for MemorySinks with methods used for testing */ -trait MemorySinkBase extends BaseStreamingSink { +trait MemorySinkBase extends BaseStreamingSink with Logging { def allData: Seq[Row] def latestBatchData: Seq[Row] def dataSinceBatch(sinceBatchId: Long): Seq[Row] def latestBatchId: Option[Long] + + /** + * Truncates the given rows to return at most maxRows rows. + * @param rows The data that may need to be truncated. + * @param batchLimit Number of rows to keep in this batch; the rest will be truncated + * @param sinkLimit Total number of rows kept in this sink, for logging purposes. + * @param batchId The ID of the batch that sent these rows, for logging purposes. + * @return Truncated rows. + */ + protected def truncateRowsIfNeeded( + rows: Array[Row], + batchLimit: Int, + sinkLimit: Int, + batchId: Long): Array[Row] = { +if (rows.length > batchLimit && batchLimit >= 0) { + logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit") --- End diff -- nit: not sure if these sinks get used by Continuous processing too. If so I would rename `batch` to `trigger version`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195797571 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow]) } /** A common trait for MemorySinks with methods used for testing */ -trait MemorySinkBase extends BaseStreamingSink { +trait MemorySinkBase extends BaseStreamingSink with Logging { def allData: Seq[Row] def latestBatchData: Seq[Row] def dataSinceBatch(sinceBatchId: Long): Seq[Row] def latestBatchId: Option[Long] + + /** + * Truncates the given rows to return at most maxRows rows. + * @param rows The data that may need to be truncated. + * @param batchLimit Number of rows to keep in this batch; the rest will be truncated + * @param sinkLimit Total number of rows kept in this sink, for logging purposes. + * @param batchId The ID of the batch that sent these rows, for logging purposes. + * @return Truncated rows. + */ + protected def truncateRowsIfNeeded( + rows: Array[Row], + batchLimit: Int, + sinkLimit: Int, + batchId: Long): Array[Row] = { +if (rows.length > batchLimit && batchLimit >= 0) { + logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit") + rows.take(batchLimit) +} else { + rows +} + } +} + +/** + * Companion object to MemorySinkBase. + */ +object MemorySinkBase { + val MAX_MEMORY_SINK_ROWS = "maxRows" + val MAX_MEMORY_SINK_ROWS_DEFAULT = -1 + + /** + * Gets the max number of rows a MemorySink should store. This number is based on the memory + * sink row limit if it is set. If not, there is no limit. + * @param options Options for writing from which we get the max rows option + * @return The maximum number of rows a memorySink should store, or None for no limit. --- End diff -- need to update docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195268299 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink { def latestBatchId: Option[Long] } +/** + * Companion object to MemorySinkBase. + */ +object MemorySinkBase { + val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows" + val MAX_MEMORY_SINK_ROWS_DEFAULT = -1 + + /** + * Gets the max number of rows a MemorySink should store. This number is based on the memory + * sink row limit if it is set. If not, there is no limit. + * @param options Options for writing from which we get the max rows option + * @return The maximum number of rows a memorySink should store, or None for no limit. + */ + def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = { +val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT) +if (maxRows >= 0) Some(maxRows) else None + } +} + + --- End diff -- nit: remove extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195268861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink { def latestBatchId: Option[Long] } +/** + * Companion object to MemorySinkBase. + */ +object MemorySinkBase { + val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows" + val MAX_MEMORY_SINK_ROWS_DEFAULT = -1 + + /** + * Gets the max number of rows a MemorySink should store. This number is based on the memory + * sink row limit if it is set. If not, there is no limit. + * @param options Options for writing from which we get the max rows option + * @return The maximum number of rows a memorySink should store, or None for no limit. + */ + def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = { +val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT) +if (maxRows >= 0) Some(maxRows) else None --- End diff -- Do you want to do `if (maxRows >= 0) maxRows else Int.MaxValue - 10` We can't exceed runtime array max size anyway --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195269434 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala --- @@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB def clear(): Unit = synchronized { batches.clear() +numRows = 0 + } + + private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { +if (rows.length > maxRows) { + logWarning(s"Truncating batch $batchId to $maxRows rows") --- End diff -- How does take behave with negative rows? Printing a warning message with negative values may be weird. I would also include the sink limit in the warning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195268999 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala --- @@ -81,22 +84,35 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB }.mkString("\n") } - def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { + def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row], sinkCapacity: Option[Int]) --- End diff -- nit: our style is more like ```scala def write( batchId: Long, outputMode: OutputMode, newRows: Array[Row], sinkCapacity: Option[Int]): Unit = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195268218 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink { def latestBatchId: Option[Long] } +/** + * Companion object to MemorySinkBase. + */ +object MemorySinkBase { + val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows" --- End diff -- `maxRows` is sufficient --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/21559 Jenkins add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/21559 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21275: [SPARK-24214][SS]Fix toJSON for StreamingRelationV2/Stre...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/21275 LGTM! Pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185886437 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -128,40 +130,49 @@ class MicroBatchExecution( * Repeatedly attempts to run batches as data arrives. */ protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { -triggerExecutor.execute(() => { - startTrigger() +triggerExecutor.execute(() => { if (isActive) { +var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run +var currentBatchHadNewData = false // Whether the current batch had new data + reportTimeTaken("triggerExecution") { + startTrigger() + + // We'll do this initialization only once every start / restart if (currentBatchId < 0) { -// We'll do this initialization only once populateStartOffsets(sparkSessionForStream) - sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) -logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { -constructNextBatch() +logInfo(s"Stream started from $committedOffsets") } - if (dataAvailable) { -currentStatus = currentStatus.copy(isDataAvailable = true) + + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) + + // Try to construct the next batch. This will return true only if the next batch is + // ready and runnable. Note that the current batch may be runnable even without + // new data to process as `constructNextBatch` may decide to run a batch for + // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data + // is available or not. + currentBatchIsRunnable = constructNextBatch() + + currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable) --- End diff -- then you can do something like: ```scala if (currentBatchIsRunnable && currentBatchHasNewData) { updateStatusMessage("Processing new data") runBatch(sparkSessionForStream) } else if (currentBatchIsRunnable) { updateStatusMessage("Processing empty trigger to timeout state") // or whatever runBatch(sparkSessionForStream) } else { updateStatusMessage("Waiting for data to arrive") } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185887294 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -266,93 +276,62 @@ class MicroBatchExecution( } /** - * Queries all of the sources to see if any new data is available. When there is new data the - * batchId counter is incremented and a new log entry is written with the newest offsets. + * Attempts to construct the next batch based on whether new data is available and/or updated + * metadata is such that another batch needs to be run for state clean up / additional output + * generation even without new data. Returns true only if the next batch should be executed. + * + * Here is the high-level logic on how this constructs the next batch. + * - Check each source whether new data is available + * - Updated the query's metadata and check using the last execution whether there is any need + * to run another batch (for state clean up, etc.) + * - If either of the above is true, then construct the next batch by committing to the offset + * log that range of offsets that the next batch will process. */ - private def constructNextBatch(): Unit = { -// Check to see what new data is available. -val hasNewData = { - awaitProgressLock.lock() - try { -// Generate a map from each unique source to the next available offset. -val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { - case s: Source => -updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("getOffset") { - (s, s.getOffset) -} - case s: MicroBatchReader => -updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("setOffsetRange") { - // Once v1 streaming source execution is gone, we can refactor this away. - // For now, we set the range here to get the source to infer the available end offset, - // get that offset, and then set the range again when we later execute. - s.setOffsetRange( -toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), -Optional.empty()) -} - -val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() } -(s, Option(currentOffset)) -}.toMap -availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) - -if (dataAvailable) { - true -} else { - noNewData = true - false + private def constructNextBatch(): Boolean = withProgressLocked { +// If new data is already available that means this method has already been called before +// and it must have already committed the offset range of next batch to the offset log. +// Hence do nothing, just return true. +if (isNewDataAvailable) return true --- End diff -- I don't see anyone else calling this method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185893229 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.SparkPlan + +class WatermarkTracker extends Logging { + private val operatorToWatermarkMap = mutable.HashMap[Int, Long]() + private var watermarkMs: Long = 0 + private var updated = false + + def setWatermark(newWatermarkMs: Long): Unit = synchronized { +watermarkMs = newWatermarkMs + } + + def updateWatermark(executedPlan: SparkPlan): Unit = synchronized { +val watermarkOperators = executedPlan.collect { --- End diff -- I would comment on the contracts. We expect a certain ordering of stateful operators across triggers. therefore we turn off cbo, etc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185892675 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -384,22 +363,21 @@ class MicroBatchExecution( commitLog.purge(currentBatchId - minLogEntriesToMaintain) } } + noNewData = false } else { - awaitProgressLock.lock() - try { -// Wake up any threads that are waiting for the stream to progress. -awaitProgressLockCondition.signalAll() - } finally { -awaitProgressLock.unlock() - } + noNewData = true + awaitProgressLockCondition.signalAll() } +shouldConstructNextBatch } /** * Processes any data available between `availableOffsets` and `committedOffsets`. * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with. */ private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = { +logDebug(s"Running batch $currentBatchId") + --- End diff -- I guess we're going to see if all sources follow the contract of returning an empty dataframe if the start and end offsets are the same --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185896682 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest { check() // nothing emitted yet addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this - check() // nothing emitted yet + check((100L, 105L) -> 2L) // no-data-batch emits results on 100-105, --- End diff -- I would explicitly test with flag on for this. in case we want to turn it off, this test shouldn't fail --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185678739 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -266,93 +276,62 @@ class MicroBatchExecution( } /** - * Queries all of the sources to see if any new data is available. When there is new data the - * batchId counter is incremented and a new log entry is written with the newest offsets. + * Attempts to construct the next batch based on whether new data is available and/or updated --- End diff -- this paragraph is highly confusing. Could you please reword? Maybe something like: ``` Attempts to construct a batch according to: - Availability of new data - Existence of timeouts in stateful operators ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185890809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -266,93 +276,62 @@ class MicroBatchExecution( } /** - * Queries all of the sources to see if any new data is available. When there is new data the - * batchId counter is incremented and a new log entry is written with the newest offsets. + * Attempts to construct the next batch based on whether new data is available and/or updated + * metadata is such that another batch needs to be run for state clean up / additional output + * generation even without new data. Returns true only if the next batch should be executed. + * + * Here is the high-level logic on how this constructs the next batch. + * - Check each source whether new data is available + * - Updated the query's metadata and check using the last execution whether there is any need + * to run another batch (for state clean up, etc.) + * - If either of the above is true, then construct the next batch by committing to the offset + * log that range of offsets that the next batch will process. */ - private def constructNextBatch(): Unit = { -// Check to see what new data is available. -val hasNewData = { - awaitProgressLock.lock() - try { -// Generate a map from each unique source to the next available offset. -val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { - case s: Source => -updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("getOffset") { - (s, s.getOffset) -} - case s: MicroBatchReader => -updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("setOffsetRange") { - // Once v1 streaming source execution is gone, we can refactor this away. - // For now, we set the range here to get the source to infer the available end offset, - // get that offset, and then set the range again when we later execute. - s.setOffsetRange( -toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), -Optional.empty()) -} - -val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() } -(s, Option(currentOffset)) -}.toMap -availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) - -if (dataAvailable) { - true -} else { - noNewData = true - false + private def constructNextBatch(): Boolean = withProgressLocked { +// If new data is already available that means this method has already been called before +// and it must have already committed the offset range of next batch to the offset log. +// Hence do nothing, just return true. +if (isNewDataAvailable) return true + +// Generate a map from each unique source to the next available offset. +val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { + case s: Source => +updateStatusMessage(s"Getting offsets from $s") +reportTimeTaken("getOffset") { + (s, s.getOffset) } - } finally { -awaitProgressLock.unlock() - } -} -if (hasNewData) { - var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermarks if we find any in the plan. - if (lastExecution != null) { -lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec => e -}.zipWithIndex.foreach { - case (e, index) if e.eventTimeStats.value.count > 0 => -logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") -val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs -val prevWatermarkMs = watermarkMsMap.get(index) -if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { - watermarkMsMap.put(index, newWatermarkMs) -} - - // Populate 0 if we haven't seen any data yet for this watermark node. - case (_, index) => -if (!watermarkMsMap.isDefinedAt(index)) { - watermarkMsMap.put(index, 0) -} + case s: MicroBatchReader => +updateSta
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185893837 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.SparkPlan + +class WatermarkTracker extends Logging { + private val operatorToWatermarkMap = mutable.HashMap[Int, Long]() + private var watermarkMs: Long = 0 + private var updated = false + + def setWatermark(newWatermarkMs: Long): Unit = synchronized { +watermarkMs = newWatermarkMs + } + + def updateWatermark(executedPlan: SparkPlan): Unit = synchronized { +val watermarkOperators = executedPlan.collect { + case e: EventTimeWatermarkExec => e +} +if (watermarkOperators.isEmpty) return + + +watermarkOperators.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => +logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") +val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs +val prevWatermarkMs = operatorToWatermarkMap.get(index) +if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { + operatorToWatermarkMap.put(index, newWatermarkMs) +} + + // Populate 0 if we haven't seen any data yet for this watermark node. + case (_, index) => +if (!operatorToWatermarkMap.isDefinedAt(index)) { + operatorToWatermarkMap.put(index, 0) +} +} + +// Update the global watermark to the minimum of all watermark nodes. +// This is the safest option, because only the global watermark is fault-tolerant. Making +// it the minimum of all individual watermarks guarantees it will never advance past where +// any individual watermark operator would be if it were in a plan by itself. +val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2 +if (newWatermarkMs > watermarkMs) { + logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") + watermarkMs = newWatermarkMs + updated = true +} else { + logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs") + updated = false +} + } + + def watermarkUpdated: Boolean = synchronized { updated } --- End diff -- is this used anywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185887201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -266,93 +276,62 @@ class MicroBatchExecution( } /** - * Queries all of the sources to see if any new data is available. When there is new data the - * batchId counter is incremented and a new log entry is written with the newest offsets. + * Attempts to construct the next batch based on whether new data is available and/or updated + * metadata is such that another batch needs to be run for state clean up / additional output + * generation even without new data. Returns true only if the next batch should be executed. + * + * Here is the high-level logic on how this constructs the next batch. + * - Check each source whether new data is available + * - Updated the query's metadata and check using the last execution whether there is any need + * to run another batch (for state clean up, etc.) + * - If either of the above is true, then construct the next batch by committing to the offset + * log that range of offsets that the next batch will process. */ - private def constructNextBatch(): Unit = { -// Check to see what new data is available. -val hasNewData = { - awaitProgressLock.lock() - try { -// Generate a map from each unique source to the next available offset. -val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { - case s: Source => -updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("getOffset") { - (s, s.getOffset) -} - case s: MicroBatchReader => -updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("setOffsetRange") { - // Once v1 streaming source execution is gone, we can refactor this away. - // For now, we set the range here to get the source to infer the available end offset, - // get that offset, and then set the range again when we later execute. - s.setOffsetRange( -toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), -Optional.empty()) -} - -val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() } -(s, Option(currentOffset)) -}.toMap -availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) - -if (dataAvailable) { - true -} else { - noNewData = true - false + private def constructNextBatch(): Boolean = withProgressLocked { +// If new data is already available that means this method has already been called before +// and it must have already committed the offset range of next batch to the offset log. +// Hence do nothing, just return true. +if (isNewDataAvailable) return true --- End diff -- how is this possible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185891436 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -373,7 +352,7 @@ class MicroBatchExecution( reader.commit(reader.deserializeOffset(off.json)) } } else { -throw new IllegalStateException(s"batch $currentBatchId doesn't exist") +throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist") --- End diff -- good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185885198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -128,40 +130,49 @@ class MicroBatchExecution( * Repeatedly attempts to run batches as data arrives. */ protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { -triggerExecutor.execute(() => { - startTrigger() +triggerExecutor.execute(() => { if (isActive) { +var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run +var currentBatchHadNewData = false // Whether the current batch had new data + reportTimeTaken("triggerExecution") { + startTrigger() + + // We'll do this initialization only once every start / restart if (currentBatchId < 0) { -// We'll do this initialization only once populateStartOffsets(sparkSessionForStream) - sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) -logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { -constructNextBatch() +logInfo(s"Stream started from $committedOffsets") } - if (dataAvailable) { -currentStatus = currentStatus.copy(isDataAvailable = true) + + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) + + // Try to construct the next batch. This will return true only if the next batch is + // ready and runnable. Note that the current batch may be runnable even without + // new data to process as `constructNextBatch` may decide to run a batch for + // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data + // is available or not. + currentBatchIsRunnable = constructNextBatch() + + currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable) --- End diff -- why not set `currentBatchHadNewData` here? It's not immediately clear to me if `isNewDataAvailable` can update between here and 3 lines below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185677898 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -128,40 +130,49 @@ class MicroBatchExecution( * Repeatedly attempts to run batches as data arrives. */ protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { -triggerExecutor.execute(() => { - startTrigger() +triggerExecutor.execute(() => { if (isActive) { +var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run +var currentBatchHadNewData = false // Whether the current batch had new data + reportTimeTaken("triggerExecution") { + startTrigger() --- End diff -- this used to be out of the timing block --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21220#discussion_r185883611 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -128,40 +130,49 @@ class MicroBatchExecution( * Repeatedly attempts to run batches as data arrives. */ protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { -triggerExecutor.execute(() => { - startTrigger() +triggerExecutor.execute(() => { if (isActive) { +var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run +var currentBatchHadNewData = false // Whether the current batch had new data + reportTimeTaken("triggerExecution") { + startTrigger() + + // We'll do this initialization only once every start / restart if (currentBatchId < 0) { -// We'll do this initialization only once populateStartOffsets(sparkSessionForStream) - sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) -logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { -constructNextBatch() +logInfo(s"Stream started from $committedOffsets") } - if (dataAvailable) { -currentStatus = currentStatus.copy(isDataAvailable = true) + + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) + + // Try to construct the next batch. This will return true only if the next batch is + // ready and runnable. Note that the current batch may be runnable even without + // new data to process as `constructNextBatch` may decide to run a batch for + // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data + // is available or not. + currentBatchIsRunnable = constructNextBatch() + + currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable) + if (currentBatchIsRunnable) { updateStatusMessage("Processing new data") +// Remember whether the current batch has data or not. This will be required later --- End diff -- the status message above isn't completely truthful if we are running a zero-data batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183912999 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,126 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +import java.util.IdentityHashMap +import scala.collection.JavaConverters._ + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } +} + +if (onlyDataSourceV2Sources) { + // DataSourceV2ScanExec is the execution plan leaf that is responsible for reading data + // from a V2 source and has a direct reference to the V2 source that generated it. Each + // DataSourceV2ScanExec records the number of rows it has read using SQLMetrics. However, + // just collecting all DataSourceV2ScanExec nodes and getting the metric is not correct as + // a DataSourceV2ScanExec instance may be referred to in the execution plan from two (or + // even multiple times) points and considering it twice will leads to double counting. We + // can't dedup them using their hashcode either because two different instances of + // DataSourceV2ScanExec can have the same hashcode but account for separate sets of + // records read, and deduping them to consider only one of them would be undercounting the + // records read. Therefore the right way to do this is to consider the unique instances of + // DataSourceV2ScanExec (using their identity hash codes) and get metrics from them. + // Hence we calculate in the following way. + // + // 1. Collect all the unique DataSourceV2ScanExec instances using IdentityHashMap. + // + // 2. Extract the source and the number of rows read from the DataSourceV2ScanExec instanes. + // + // 3. Multiple Da
[GitHub] spark pull request #21134: [SPARK-24056] [SS] Make consumer creation lazy in...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21134#discussion_r183860834 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -53,7 +53,7 @@ private[kafka010] class KafkaOffsetReader( */ val kafkaReaderThread = Executors.newSingleThreadExecutor(new ThreadFactory { override def newThread(r: Runnable): Thread = { - val t = new UninterruptibleThread("Kafka Offset Reader") { + val t = new UninterruptibleThread(s"Kafka Offset Reader") { --- End diff -- uber nit: don't need `s` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183542367 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() + +testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( --- End diff -- e.g. self-join? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183533297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } --- End diff -- we don't have a way to track these for ContinuousProcessing at the moment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183542307 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() + +testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( --- End diff -- what if you do a stream-stream join? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183542025 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -733,6 +804,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + def getLastProgressWithData(q: StreamingQuery): Option[StreamingQueryProgress] = { +q.recentProgress.filter(_.numInputRows > 0).lastOption + } + + --- End diff -- nit: extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183541978 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() --- End diff -- nit: unpersist later? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/21124 LGTM! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180936744 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileSystem => _, _} +import java.util.{EnumSet, UUID} + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * An interface to abstract out all operation related to streaming checkpoints. Most importantly, + * the key operation this interface provides is `createAtomic(path, overwrite)` which returns a + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations + * to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or + * without overwrite. + * + * This higher-level interface above the Hadoop FileSystem is necessary because + * different implementation of FileSystem/FileContext may have different combination of operations + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename, + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations while + * keeping the usage simple (`createAtomic` -> `close` or `cancel`). + */ +trait CheckpointFileManager { + + import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + + /** + * Create a file and make its contents available atomically after the output stream is closed. + * + * @param pathPath to create + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + *overwrite the file if it already exists. It should not throw + *any exception if the file exists. However, if false, then the + *implementation must not overwrite if the file alraedy exists and + *must throw `FileAlreadyExistsException` in that case. + */ + def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + + /** List the files in a path that match a filter. */ + def list(path: Path, filter: PathFilter): Array[FileStatus] + + /** List all the files in a path. */ + def list(path: Path): Array[FileStatus] = { +list(path, new PathFilter { override def accept(path: Path): Boolean = true }) + } + + /** Make directory at the give path and all its parent directories as needed. */ + def mkdirs(path: Path): Unit + + /** Whether path exists */ + def exists(path: Path): Boolean + + /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ + def delete(path: Path): Unit + + /** Is the default file system this implementation is operating on the local file system. */ + def isLocal: Boolean +} + +object CheckpointFileManager extends Logging { + + /** + * Additional methods in CheckpointFileManager implementations that allows + * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180938118 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileSystem => _, _} +import java.util.{EnumSet, UUID} + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * An interface to abstract out all operation related to streaming checkpoints. Most importantly, + * the key operation this interface provides is `createAtomic(path, overwrite)` which returns a + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations + * to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or + * without overwrite. + * + * This higher-level interface above the Hadoop FileSystem is necessary because + * different implementation of FileSystem/FileContext may have different combination of operations + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename, + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations while + * keeping the usage simple (`createAtomic` -> `close` or `cancel`). + */ +trait CheckpointFileManager { + + import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + + /** + * Create a file and make its contents available atomically after the output stream is closed. + * + * @param pathPath to create + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + *overwrite the file if it already exists. It should not throw + *any exception if the file exists. However, if false, then the + *implementation must not overwrite if the file alraedy exists and + *must throw `FileAlreadyExistsException` in that case. + */ + def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + + /** List the files in a path that match a filter. */ + def list(path: Path, filter: PathFilter): Array[FileStatus] + + /** List all the files in a path. */ + def list(path: Path): Array[FileStatus] = { +list(path, new PathFilter { override def accept(path: Path): Boolean = true }) + } + + /** Make directory at the give path and all its parent directories as needed. */ + def mkdirs(path: Path): Unit + + /** Whether path exists */ + def exists(path: Path): Boolean + + /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ + def delete(path: Path): Unit + + /** Is the default file system this implementation is operating on the local file system. */ + def isLocal: Boolean +} + +object CheckpointFileManager extends Logging { + + /** + * Additional methods in CheckpointFileManager implementations that allows + * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180935752 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileSystem => _, _} --- End diff -- whoa what does `FileSystem => _` do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180938649 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala --- @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io._ +import java.net.URI + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + +abstract class CheckpointFileManagerTests extends SparkFunSuite { + + def createManager(path: Path): CheckpointFileManager + + test("mkdirs, list, createAtomic, open, delete") { +withTempPath { p => + val basePath = new Path(p.getAbsolutePath) + val fm = createManager(basePath) + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + assert(!fm.exists(dir)) + fm.mkdirs(dir) + assert(fm.exists(dir)) + fm.mkdirs(dir) + + // List + val acceptAllFilter = new PathFilter { +override def accept(path: Path): Boolean = true + } + val rejectAllFilter = new PathFilter { +override def accept(path: Path): Boolean = false + } + assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir")) + assert(fm.list(basePath, rejectAllFilter).length === 0) + + // Create atomic without overwrite + var path = new Path(s"$dir/file") + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = false).cancel() + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = false).close() + assert(fm.exists(path)) + intercept[IOException] { +// should throw exception since file exists and overwrite is false +fm.createAtomic(path, overwriteIfPossible = false).close() + } + + // Create atomic with overwrite if possible + path = new Path(s"$dir/file2") + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = true).cancel() + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = true).close() + assert(fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception + + // Open and delete + fm.open(path).close() + fm.delete(path) + assert(!fm.exists(path)) + intercept[IOException] { +fm.open(path) + } + fm.delete(path) // should not throw exception +} + } + + protected def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } +} + +class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { + + test("CheckpointFileManager.create() should pick up user-specified class from conf") { +withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> +classOf[TestCheckpointFileManager].getName) { + val fileManager = +CheckpointFileManager.create(new Path("/"), spark.sessionState.newHadoopConf) + assert(fileManager.isInstanceOf[TestCheckpointFileManager]) +} + } + + test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") { +import FakeFileSystem.scheme +spark.conf.set( + s"fs.$scheme.impl", + classOf[FakeFileSystem].getName) +withTempDir { temp =&
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180938989 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -471,6 +470,41 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("error writing [version].delta cancels the output stream") { + +val hadoopConf = new Configuration() +hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[TestCheckpointFileManager].getName) +val remoteDir = Utils.createTempDir().getAbsolutePath + +val provider = newStoreProvider( + opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = hadoopConf) + +// Disable failure of output stream and generate versions +TestCheckpointFileManager.shouldFailInCreateAtomic = false +for (version <- 1 to 10) { + val store = provider.getStore(version - 1) + put(store, version.toString, version) // update "1" -> 1, "2" -> 2, ... + store.commit() +} +val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet + +val store = provider.getStore(10) +// Fail commit for next version and verify that reloading resets the files +TestCheckpointFileManager.shouldFailInCreateAtomic = true +put(store, "11", 11) +val e = intercept[IllegalStateException] { quietly { store.commit() } } +assert(e.getCause.isInstanceOf[IOException], "Was waiting the IOException to be thrown") +TestCheckpointFileManager.shouldFailInCreateAtomic = false + +// Abort commit for next version and verify that reloading resets the files +val store2 = provider.getStore(10) +put(store2, "11", 11) +store2.abort() +assert(TestCheckpointFileManager.cancelCalledInCreateAtomic) --- End diff -- can you verify that it was false before the `abort`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180936292 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileSystem => _, _} +import java.util.{EnumSet, UUID} + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * An interface to abstract out all operation related to streaming checkpoints. Most importantly, + * the key operation this interface provides is `createAtomic(path, overwrite)` which returns a + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations + * to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or + * without overwrite. + * + * This higher-level interface above the Hadoop FileSystem is necessary because + * different implementation of FileSystem/FileContext may have different combination of operations + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename, + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations while + * keeping the usage simple (`createAtomic` -> `close` or `cancel`). + */ +trait CheckpointFileManager { + + import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + + /** + * Create a file and make its contents available atomically after the output stream is closed. + * + * @param pathPath to create + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + *overwrite the file if it already exists. It should not throw + *any exception if the file exists. However, if false, then the + *implementation must not overwrite if the file alraedy exists and + *must throw `FileAlreadyExistsException` in that case. + */ + def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + + /** List the files in a path that match a filter. */ + def list(path: Path, filter: PathFilter): Array[FileStatus] + + /** List all the files in a path. */ + def list(path: Path): Array[FileStatus] = { +list(path, new PathFilter { override def accept(path: Path): Boolean = true }) + } + + /** Make directory at the give path and all its parent directories as needed. */ + def mkdirs(path: Path): Unit + + /** Whether path exists */ + def exists(path: Path): Boolean + + /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ + def delete(path: Path): Unit + + /** Is the default file system this implementation is operating on the local file system. */ + def isLocal: Boolean +} + +object CheckpointFileManager extends Logging { + + /** + * Additional methods in CheckpointFileManager implementations that allows + * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180937241 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileSystem => _, _} +import java.util.{EnumSet, UUID} + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * An interface to abstract out all operation related to streaming checkpoints. Most importantly, + * the key operation this interface provides is `createAtomic(path, overwrite)` which returns a + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations + * to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or + * without overwrite. + * + * This higher-level interface above the Hadoop FileSystem is necessary because + * different implementation of FileSystem/FileContext may have different combination of operations + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename, + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations while + * keeping the usage simple (`createAtomic` -> `close` or `cancel`). + */ +trait CheckpointFileManager { + + import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + + /** + * Create a file and make its contents available atomically after the output stream is closed. + * + * @param pathPath to create + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + *overwrite the file if it already exists. It should not throw + *any exception if the file exists. However, if false, then the + *implementation must not overwrite if the file alraedy exists and + *must throw `FileAlreadyExistsException` in that case. + */ + def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + + /** List the files in a path that match a filter. */ + def list(path: Path, filter: PathFilter): Array[FileStatus] + + /** List all the files in a path. */ + def list(path: Path): Array[FileStatus] = { +list(path, new PathFilter { override def accept(path: Path): Boolean = true }) + } + + /** Make directory at the give path and all its parent directories as needed. */ + def mkdirs(path: Path): Unit + + /** Whether path exists */ + def exists(path: Path): Boolean + + /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ + def delete(path: Path): Unit + + /** Is the default file system this implementation is operating on the local file system. */ + def isLocal: Boolean +} + +object CheckpointFileManager extends Logging { + + /** + * Additional methods in CheckpointFileManager implementations that allows + * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
[GitHub] spark issue #20937: [SPARK-23723][SPARK-23724][SQL] Support custom encoding ...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20937 yes, please do --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20941: [SPARK-23827] [SS] StreamingJoinExec should ensur...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20941#discussion_r178218438 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -450,8 +450,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be // This can often catch hard to debug errors when developing stateful operators val executedPlan = currentStream.lastExecution.executedPlan executedPlan.collect { case s: StatefulOperator => s }.foreach { s => - assert(s.stateInfo.isDefined) - assert(s.stateInfo.get.numPartitions >= 1) + assert( + s.stateInfo.map(_.numPartitions).contains(currentStream.lastExecution.numStateStores)) --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20941: [SPARK-23827] [SS] StreamingJoinExec should ensur...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20941#discussion_r178208005 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -444,6 +445,26 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } } + if (currentStream.isInstanceOf[MicroBatchExecution]) { +// Verify if stateful operators have correct metadata and distribution +// This can often catch hard to debug errors when developing stateful operators +val executedPlan = currentStream.lastExecution.executedPlan +executedPlan.collect { case s: StatefulOperator => s }.foreach { s => + assert(s.stateInfo.isDefined) + assert(s.stateInfo.get.numPartitions >= 1) + + s.requiredChildDistribution.foreach { d => +withClue(s"$s specifies incorrect # partitions in requiredChildDistribution $d") { + assert(d.requiredNumPartitions.isDefined) + assert(d.requiredNumPartitions.get >= 1) + if (d != AllTuples) { +assert(d.requiredNumPartitions.get == s.stateInfo.get.numPartitions) --- End diff -- can you also verify that this is equal to the number of partitions in the metadata? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173543942 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,99 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * This will make a best effort attempt to --- End diff -- I would love to see the rest of this sentence. Such a cliffhanger! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20698: [SPARK-23541][SS] Allow Kafka source to read data with g...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20698 LGTM pending tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171988510 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.sources.v2.DataSourceOptions + +class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { + + def testWithMinPartitions(name: String, minPartition: Int) + (f: KafkaOffsetRangeCalculator => Unit): Unit = { +val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava) +test(s"with minPartition = $minPartition: $name") { + f(KafkaOffsetRangeCalculator(options)) +} + } + + + test("with no minPartition: N TopicPartitions to N offset ranges") { +val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 2)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1), +untilOffsets = Map(tp1 -> 2)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1), +untilOffsets = Map(tp1 -> 2), +executorLocations = Seq("location")) == + Seq(KafkaOffsetRange(tp1, 1, 2, Some("location" + } + + test("with no minPartition: empty ranges ignored") { +val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1), +untilOffsets = Map(tp1 -> 2, tp2 -> 1)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + } + + testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc => +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1), +untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) == + Seq( +KafkaOffsetRange(tp1, 1, 2, None), +KafkaOffsetRange(tp2, 1, 2, None), +KafkaOffsetRange(tp3, 1, 2, None))) + } + + testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc => +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 5)) == + Seq( +KafkaOffsetRange(tp1, 1, 2, None), +KafkaOffsetRange(tp1, 2, 3, None), +KafkaOffsetRange(tp1, 3, 4, None), +KafkaOffsetRange(tp1, 4, 5, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 5), +executorLocations = Seq("location")) == +Seq( + KafkaOffsetRange(tp1, 1, 2, None), + KafkaOffsetRange(tp1, 2, 3, None), + KafkaOffsetRange(tp1, 3, 4, None), + KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when minPartition is set + } + + testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { calc => --- End diff -- can you also add a test: ``` fromOffsets = Map(tp1 -> 1), untilOffsets = M
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r17198 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum --- End diff -- nit: `map(_.size).sum` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171987888 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt --- End diff -- nit: `range.size`, you may remove `rangeSize` above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171988062 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +} + +private[kafka010] object KafkaOffsetR
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171988112 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +} + +private[kafka010] object KafkaOffsetR
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171983185 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + --- End diff -- nit: extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732779 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -370,8 +361,14 @@ private[kafka010] class KafkaMicroBatchDataReader( override def close(): Unit = { // Indicate that we're no longer using this consumer --- End diff -- maybe remove this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171733516 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt --- End diff -- yeah, a comment about how this is calculating the `weight` of partitions to assign to this topic would help. In addition, the sum of `parts` after this calculation will be `>= minPartitions` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171733038 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { --- End diff -- I don't think we need the first check. `offsetRanges.size` should be greater than 0 right? Otherwise we wouldn't have called into this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732729 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader( } /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] class KafkaMicroBatchDataReaderFactory( -range: KafkaOffsetRange, -preferredLoc: Option[String], +private[kafka010] case class KafkaMicroBatchDataReaderFactory( +offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { +failOnDataLoss: Boolean, +reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] { - override def preferredLocations(): Array[String] = preferredLoc.toArray + override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader( -range, executorKafkaParams, pollTimeoutMs, failOnDataLoss) +offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) } /** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] class KafkaMicroBatchDataReader( +private[kafka010] case class KafkaMicroBatchDataReader( offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging { +failOnDataLoss: Boolean, +reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging { + + private val consumer = { +if (!reuseKafkaConsumer) { + // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we --- End diff -- `nit: We use 'assign' here, hence don't need to ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171733181 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) --- End diff -- was this check here before? What if there are new topic partitions? Are we missing those, because they may not exist in fromOffsets? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732183 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever // (KAFKA-1894). -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) --- End diff -- Assertions can be turned off --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20673 @HyukjinKwon Added tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170646787 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,18 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +// Use piped streams to avoid extra memory consumption +val outputStream = new PipedOutputStream() +val inputStream = new PipedInputStream(outputStream) +try { + mapper.writeValue(outputStream, event) --- End diff -- I was actually hoping for a test to fail, but none did (the test suite has a bunch of very specific stuff). This code will likely block forever if the block size is larger. Going to add a test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/20673 [SPARK-23515] Use input/output streams for large events in JsonProtocol.sparkEventToJson ## What changes were proposed in this pull request? `def sparkEventToJson(event: SparkListenerEvent)` has a fallback method which creates a JSON object by turning an unrecognized event to Json and then parsing it again. This method materializes the whole string to parse the json record, which is unnecessary, and can cause OOMs as seen in the stack trace below: ``` java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103) ``` We should just use the stream parsing to avoid such OOMs. ## How was this patch tested? You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark eventLoggingJson Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20673 commit 774188003c5b1c1a000d69f5996dce580c7a1432 Author: Burak Yavuz <brkyvz@...> Date: 2018-02-25T20:07:22Z use streams for large events --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20614: Revert [SPARK-23094] Fix invalid character handling in J...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20614 LGTM, my initial assumption that files had to be UTF-8 encoded was a wrong one :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20279 Closed in favor of #20445 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user brkyvz closed the pull request at: https://github.com/apache/spark/pull/20279 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20302: [SPARK-23094] Fix invalid character handling in JsonData...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/20302 cc @hvanhovell @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20302: [SPARK-23094] Fix invalid character handling in J...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/20302 [SPARK-23094] Fix invalid character handling in JsonDataSource ## What changes were proposed in this pull request? There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` ([Fix #1](https://github.com/apache/spark/commit/c8803c06854683c8761fdb3c0e4c55d5a9e22a95), [Fix #2](https://github.com/apache/spark/commit/86174ea89b39a300caaba6baffac70f3dc702788)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case. ## How was this patch tested? Regression tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark json-invfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20302 commit 231152ab0d615166bdea354916a9f6ca0aaf7e6a Author: Burak Yavuz <brkyvz@...> Date: 2018-01-18T01:01:00Z [ES-4104][WARMFIX] Fix invalid character handling in JsonDataSource Cherry-pick of https://github.com/databricks/spark/pull/1135 ## What changes were proposed in this pull request? I shall also merge this upstream, but wanted to merge here first since it was an ES ticket related to Qubole workloads. There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` in OSS ([Fix #1](https://github.com/apache/spark/commit/c8803c06854683c8761fdb3c0e4c55d5a9e22a95), [Fix #2](https://github.com/apache/spark/commit/86174ea89b39a300caaba6baffac70f3dc702788)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case. ## How was this patch tested? Regression tests Author: Burak Yavuz Closes #1135 from brkyvz/json-32-dbx. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. ## **IMPORTANT** Warmfix instructions If this PR needs to be warmfixed (i.e. merged into the release branch after the code freeze), please follow steps below. What type of warmfix is this? Please select **exactly one choice**, or write description in Other. - [ ] Regression (e.g. fixing the behavior of a feature that regressed in the current release cycle) - [ ] ES ticket fix (e.g. Customer or internally requested update/fix) - [ ] Other (please describe): Make the following updates to this PR: - [ ] Add `[WARMFIX]` in the title of this PR. - [ ] Label the PR using label(s) corrsponding to the WARMFIX branch(es). The label name should be in the format of `dbr-branch-a.b` (e.g. `dbr-branch-3.2`), which matches the release branch name for Runtime release `a.b`. - [ ] Ask your team lead to sign off this warmfix and add the `warmfix-approved` label. - [ ] When merging the PR using the merge script, make sure to get this PR merged into the following branches: - The branch against which your PR is opened, and - Any extra release branch(es) corresponding to the `dbr-branch-a.b` label(s) applied to your PR. Author: Burak Yavuz <brk...@gmail.com> Closes #1616 from brkyvz/inv-json4x. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/20279 [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs ## What changes were proposed in this pull request? This PR migrates the MemoryStream to DataSourceV2 APIs. ## How was this patch tested? All existing unit tests in streaming. You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark memv2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20279 commit 7c09b376eef6a4e6c118c78ad9459cb55e59e67f Author: Burak Yavuz <brkyvz@...> Date: 2018-01-11T16:44:19Z save for so far commit 78c50f860aa13f569669f4ad77f4325d80085c8b Author: Burak Yavuz <brkyvz@...> Date: 2018-01-12T18:27:49Z Save so far commit 2777b5b38596a1fb68bcf8ee928aec1a58dc372c Author: Burak Yavuz <brkyvz@...> Date: 2018-01-13T01:43:03Z save so far commit 50a541b5890f328a655a7ef1fca4f8480b9a35f0 Author: Burak Yavuz <brkyvz@...> Date: 2018-01-16T19:14:08Z Compiles and I think also runs correctly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/18029 Merged to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627994 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPosition(initialPosition) .checkpointAppName(appName) .checkpointInterval(checkpointInterval) .storageLevel(storageLevel) .build(); assert(kinesisDStream.streamName() == streamName); assert(kinesisDStream.endpointUrl() == endpointUrl); assert(kinesisDStream.regionName() == region); -assert(kinesisDStream.initialPositionInStream() == initialPosition); +assert(kinesisDStream.initialPosition().getPosition() == initialPosition.getPosition()); +assert(kinesisDStream.checkpointAppName() == appName); +assert(kinesisDStream.checkpointInterval() == checkpointInterval); +assert(kinesisDStream._storageLevel() == storageLevel); +ssc.stop(); + } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApi() { +String streamName = "a-very-nice-stream-name"; +String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;; +String region = "us-west-2"; +String appName = "a-very-nice-kinesis-app"; +Duration checkpointInterval = Seconds.apply(30); +StorageLevel storageLevel = StorageLevel.MEMORY_ONLY(); + +KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder() +.streamingContext(ssc) +.streamName(streamName) +.endpointUrl(endpointUrl) +.regionName(region) +.initialPositionInStream(InitialPositionInStream.LATEST) +.checkpointAppName(appName) +.checkpointInterval(checkpointInterval) +.storageLevel(storageLevel) +.build(); +assert(kinesisDStream.streamName() == streamName); +assert(kinesisDStream.endpointUrl() == endpointUrl); +assert(kinesisDStream.regionName() == region); +assert(kinesisDStream.initialPosition().getPosition() == InitialPositionInStream.LATEST); assert(kinesisDStream.checkpointAppName() == appName); assert(kinesisDStream.checkpointInterval() == checkpointInterval); assert(kinesisDStream._storageLevel() == storageLevel); ssc.stop(); } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * Test old API doesn't support the InitialPositionInStream.AT_TIMESTAMP. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApiAtTimestamp() { --- End diff -- This test could be moved to become a Scala test instead, using ```scala intercept[UnsupportedOperationException] { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627640 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -56,12 +57,13 @@ import org.apache.spark.util.Utils * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) * @param regionName Region name used by the Kinesis Client Library for *DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * @param initialPosition Instance of [[KinesisInitialPosition]] + * In the absence of Kinesis checkpoint info, this is the --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627744 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + +import java.io.Serializable; +import java.util.Date; + +/** + * A java wrapper for exposing [[InitialPositionInStream]] + * to the corresponding Kinesis readers. + */ +interface KinesisInitialPosition { +InitialPositionInStream getPosition(); +} + +public class KinesisInitialPositions { +public static class Latest implements KinesisInitialPosition, Serializable { +public Latest() {} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.LATEST; +} +} + +public static class TrimHorizon implements KinesisInitialPosition, Serializable { +public TrimHorizon() {} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.TRIM_HORIZON; +} +} + +public static class AtTimestamp implements KinesisInitialPosition, Serializable { +private Date timestamp; + +public AtTimestamp(Date timestamp) { +this.timestamp = timestamp; +} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.AT_TIMESTAMP; +} + +public Date getTimestamp() { +return timestamp; +} +} + + +/** + * Returns instance of [[KinesisInitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ +public static KinesisInitialPosition fromKinesisInitialPosition( +InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException { +if (initialPositionInStream == InitialPositionInStream.LATEST) { +return new Latest(); +} else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) { +return new TrimHorizon(); +} else { +// InitialPositionInStream.AT_TIMESTAMP is not supported. +// Use InitialPosition.atTimestamp(timestamp) instead. +throw new UnsupportedOperationException( +"Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON " + +"supported in initialPositionInStream(). Please use the initialPosition() from " + +"builder API in KinesisInputDStream for using InitialPositionInStream.AT_TIMESTAMP"); +} +} +} --- End diff -- nit: new line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627941 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPosition(initialPosition) .checkpointAppName(appName) .checkpointInterval(checkpointInterval) .storageLevel(storageLevel) .build(); assert(kinesisDStream.streamName() == streamName); assert(kinesisDStream.endpointUrl() == endpointUrl); assert(kinesisDStream.regionName() == region); -assert(kinesisDStream.initialPositionInStream() == initialPosition); +assert(kinesisDStream.initialPosition().getPosition() == initialPosition.getPosition()); +assert(kinesisDStream.checkpointAppName() == appName); +assert(kinesisDStream.checkpointInterval() == checkpointInterval); +assert(kinesisDStream._storageLevel() == storageLevel); +ssc.stop(); + } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApi() { +String streamName = "a-very-nice-stream-name"; +String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;; +String region = "us-west-2"; +String appName = "a-very-nice-kinesis-app"; +Duration checkpointInterval = Seconds.apply(30); +StorageLevel storageLevel = StorageLevel.MEMORY_ONLY(); + +KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder() +.streamingContext(ssc) --- End diff -- nit: indentation should be 2 spaces. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19975: [SPARK-22781][SS] Support creating streaming dataset wit...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/19975 This LGTM. @zsxwing Any other comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/18029 Making them as singletons is unnecessary. How about this: ```java public interface InitialPosition { public InitialPositionInStream toKinesis(); } public class InitialPositions { public static class Latest implements InitialPosition { public Latest() {} @Override public InitialPositionInStream toKinesis() { return InitialPositionInStream.LATEST; } } public static class TrimHorizon implements InitialPosition { public TrimHorizon() {} @Override public InitialPositionInStream toKinesis() { return InitialPositionInStream.TRIM_HORIZON; } } public static class AtTimestamp implements InitialPosition { private Date timestamp; public AtTimestamp(Date timestamp) { this.timestamp = timestamp; } @Override public InitialPositionInStream toKinesis() { return InitialPositionInStream.AT_TIMESTAMP; } public Date getTimestamp() { return timestamp; } } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/18029 Actually yeah, I like your way. On Dec 14, 2017 3:08 PM, "yashs360" <notificati...@github.com> wrote: > *@yashs360* commented on this pull request. > -- > > In external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/ > InitialPosition.scala > <https://github.com/apache/spark/pull/18029#discussion_r157086878>: > > > +import java.util.Date > + > +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream > + > +/** > + * Trait for Kinesis's InitialPositionInStream. > + * This will be overridden by more specific types. > + */ > +sealed trait InitialPosition { > + val initialPositionInStream: InitialPositionInStream > +} > + > +/** > + * Case object for Kinesis's InitialPositionInStream.LATEST. > + */ > +case object Latest extends InitialPosition { > > Hi @brkyvz <https://github.com/brkyvz> , Thanks for the review. > Are you suggesting to put everything into a new object. And refer the case > objects from the java class methods? > In that case is it better to create the objects in Java and expose them > directly, since we will have cases where we will need direct access to the > case objects/classes (instead of the java methods) like one of the test > cases: > initialPosition.asInstanceOf[AtTimestamp].timestamp > > I would create a new branch with the changes and share with you if its > fine ? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18029#discussion_r157086878>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AFACewoTV1GYt4dpddBP_Jsx7cF6AUVjks5tAaprgaJpZM4NfLn-> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r157044691 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.util.{Clock, Utils} + +class MicroBatchExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: Sink, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + private val triggerExecutor = trigger match { +case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) +case OneTimeTrigger => OneTimeExecutor() +case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + } + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in QueryExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() +val _logicalPlan = analyzedPlan.transform { + case streamingRelation@StreamingRelation(dataSource, _, output) => +toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output)(sparkSession) +}) +} +sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } +uniqueSources = sources.distinct +_logicalPlan + } + + /** + * Repeatedly attempts to run batches as data arrives. + */ + protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +triggerExecutor.execute(() => { + startTrigger() + + if (isActive) { +reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { +// We'll do this initialization only once +populateStartOffsets(sparkSessionForStream) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) +logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { +constructNextBatch() + } + if (dataAvailable) { +currentStatus = currentStatus.copy(isDataAvailable = true) +updateStatusMessage("Processing new data")
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156266325 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { --- End diff -- please add `@InterfaceStability.Evolving` above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156265997 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -101,12 +102,60 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) -assert(dstream.initialPositionInStream == customInitialPosition) +assert(dstream.initialPosition == customInitialPosition) assert(dstream.checkpointAppName == customAppName) assert(dstream.checkpointInterval == customCheckpointInterval) assert(dstream._storageLevel == customStorageLevel) assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with AtTimestamp +val cal = Calendar.getInstance() +cal.add(Calendar.DATE, -1) +val timestamp = cal.getTime() +val initialPositionAtTimestamp = AtTimestamp(timestamp) + +val dstreamAtTimestamp = builder + .endpointUrl(customEndpointUrl) + .regionName(customRegion) + .initialPosition(initialPositionAtTimestamp) + .checkpointAppName(customAppName) + .checkpointInterval(customCheckpointInterval) + .storageLevel(customStorageLevel) + .kinesisCredentials(customKinesisCreds) + .dynamoDBCredentials(customDynamoDBCreds) + .cloudWatchCredentials(customCloudWatchCreds) + .build() +assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl) +assert(dstreamAtTimestamp.regionName == customRegion) +assert(dstreamAtTimestamp.initialPosition.initialPositionInStream + == initialPositionAtTimestamp.initialPositionInStream) +assert( + dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].timestamp.equals(timestamp)) +assert(dstreamAtTimestamp.checkpointAppName == customAppName) +assert(dstreamAtTimestamp.checkpointInterval == customCheckpointInterval) +assert(dstreamAtTimestamp._storageLevel == customStorageLevel) +assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds) +assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds)) +assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with AtTimestamp +val initialPositionAtTimestamp2 = AtTimestamp(timestamp) --- End diff -- how is the following lines a different test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156266783 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { --- End diff -- can you wrap all of these in an object? ```scala sealed trait InitialPosition { ... } object internal { case object Latest extends InitialPosition { } ... case class AtTimestamp(timestamp: Date) extends InitialPosition { } } ``` Note how InitialPosition is outside, and `internal` is lowercase. so that people go only through the Java Interface (`org.apache.spark.streaming.kinesis.Latest()`) etc Your documentation and test cases go through the Scala interface which makes it super weird to have 2 things corresponding to the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r155834182 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability and save the data from a microbatch to the data source. + */ +@InterfaceStability.Evolving +public interface MicroBatchWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + *queries running at the same time, and the returned {@link DataSourceV2Writer} + *can use this id to distinguish itself from others. + * @param epochId The uniquenumeric ID of the batch within this writing query. This is an + *incrementing counter representing a consistent set of data; the same batch may + *be started multiple times in failure recovery scenarios, but it will always + *contain the same records. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive batch output means to this + * source, please refer to {@link OutputMode} for more details. --- End diff -- to this **sink**? not source --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19838: [SPARK-22638][SS]Use a separate query for StreamingQuery...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/19838 LGTM but I have very limited context on this codepath. Maybe @tdas can also take a very quick look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19838: [SPARK-22638][SS]Use a separate query for Streami...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19838#discussion_r153910748 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala --- @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addToSharedQueue(this) + sparkListenerBus.addToQueue(this, "streams") --- End diff -- nit: wanna make this a constant? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/19495 LGTM! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145283034 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala --- @@ -1086,4 +1181,24 @@ object FlatMapGroupsWithStateSuite { override def metrics: StateStoreMetrics = new StateStoreMetrics(map.size, 0, Map.empty) override def hasCommitted: Boolean = true } + + def assertCanGetProcessingTime(predicate: => Boolean): Unit = { +if (!predicate) throw new TestFailedException("Could not get processing time", 20) --- End diff -- what is the `20`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145282388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala --- @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec( private val isTimeoutEnabled = timeoutConf != NoTimeout val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled) + val watermarkPresent = child.output.exists { --- End diff -- this is cleaner, but doesn't `eventTimeWatermark.isDefined` imply that the watermark is present? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145282515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (!watermarkPresent) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling setting watermark before " + --- End diff -- `without enabling setting watermark` sounds too convoluted. You probably meant `without setting watermark`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145282877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flat
[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/19495 LGTM. Just a bunch of cosmetic nits, but fine to address them separately --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org