Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195798990
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records:
Array[UnsafeRow])
}
/** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
def allData: Seq[Row]
def latestBatchData: Seq[Row]
def dataSinceBatch(sinceBatchId: Long): Seq[Row]
def latestBatchId: Option[Long]
+
+ /**
+ * Truncates the given rows to return at most maxRows rows.
+ * @param rows The data that may need to be truncated.
+ * @param batchLimit Number of rows to keep in this batch; the rest will
be truncated
+ * @param sinkLimit Total number of rows kept in this sink, for logging
purposes.
+ * @param batchId The ID of the batch that sent these rows, for logging
purposes.
+ * @return Truncated rows.
+ */
+ protected def truncateRowsIfNeeded(
+ rows: Array[Row],
+ batchLimit: Int,
+ sinkLimit: Int,
+ batchId: Long): Array[Row] = {
+ if (rows.length > batchLimit && batchLimit >= 0) {
+ logWarning(s"Truncating batch $batchId to $batchLimit rows because
of sink limit $sinkLimit")
--- End diff --
nit: not sure if these sinks get used by Continuous processing too. If so I
would rename `batch` to `trigger version`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]