Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195268299
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
def latestBatchId: Option[Long]
}
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+ val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
+ val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+ /**
+ * Gets the max number of rows a MemorySink should store. This number is
based on the memory
+ * sink row limit if it is set. If not, there is no limit.
+ * @param options Options for writing from which we get the max rows
option
+ * @return The maximum number of rows a memorySink should store, or None
for no limit.
+ */
+ def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
+ val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS,
MAX_MEMORY_SINK_ROWS_DEFAULT)
+ if (maxRows >= 0) Some(maxRows) else None
+ }
+}
+
+
--- End diff --
nit: remove extra line
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]