sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1504503668
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -252,16 +252,30 @@ class RocksDB(
}
}
+ /**
+ * Check whether the column family name is for internal column families.
+ * @param cfName - column family name
+ * @return - true if the column family is for internal use, false otherwise
+ */
+ private def checkInternalColumnFamilies(cfName: String): Boolean =
cfName.charAt(0) == '_'
+
/**
* Create RocksDB column family, if not created already
*/
- def createColFamilyIfAbsent(colFamilyName: String): Unit = {
- if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+ def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean =
false): Unit = {
+ // Remove leading and trailing whitespaces
+ val cfName = colFamilyName.trim
+
+ if (cfName == StateStore.DEFAULT_COL_FAMILY_NAME) {
throw new SparkUnsupportedOperationException(
errorClass = "_LEGACY_ERROR_TEMP_3197",
messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
}
+ if (!isInternal && cfName.charAt(0) == '_') {
Review Comment:
We should add a check that the cfName is not empty string, and throw an
error. [Right now we will end in IndexOutofBoundsException]
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
override def getQueryInfo(): QueryInfo = currQueryInfo
+ private def getTimerState[T](): TimerStateImpl[T] = {
+ new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+ }
+
+ private val timerState = getTimerState[Boolean]()
+
+ override def registerTimer(expiryTimestampMs: Long): Unit = {
+ verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+ s"Cannot register timers with incorrect TimeoutMode")
+ verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+ s"Cannot register timers with " +
+ s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")
+
+ if (timerState.exists(expiryTimestampMs)) {
+ logWarning(s"Timer already exists for
expiryTimestampMs=$expiryTimestampMs")
Review Comment:
We should validate here that `expiryTimestampMs` is >= `batchTimestampMs`,
or watermark based on the timeout mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]