Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195797571
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records:
Array[UnsafeRow])
}
/** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
def allData: Seq[Row]
def latestBatchData: Seq[Row]
def dataSinceBatch(sinceBatchId: Long): Seq[Row]
def latestBatchId: Option[Long]
+
+ /**
+ * Truncates the given rows to return at most maxRows rows.
+ * @param rows The data that may need to be truncated.
+ * @param batchLimit Number of rows to keep in this batch; the rest will
be truncated
+ * @param sinkLimit Total number of rows kept in this sink, for logging
purposes.
+ * @param batchId The ID of the batch that sent these rows, for logging
purposes.
+ * @return Truncated rows.
+ */
+ protected def truncateRowsIfNeeded(
+ rows: Array[Row],
+ batchLimit: Int,
+ sinkLimit: Int,
+ batchId: Long): Array[Row] = {
+ if (rows.length > batchLimit && batchLimit >= 0) {
+ logWarning(s"Truncating batch $batchId to $batchLimit rows because
of sink limit $sinkLimit")
+ rows.take(batchLimit)
+ } else {
+ rows
+ }
+ }
+}
+
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+ val MAX_MEMORY_SINK_ROWS = "maxRows"
+ val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+ /**
+ * Gets the max number of rows a MemorySink should store. This number is
based on the memory
+ * sink row limit if it is set. If not, there is no limit.
+ * @param options Options for writing from which we get the max rows
option
+ * @return The maximum number of rows a memorySink should store, or None
for no limit.
--- End diff --
need to update docs
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]