anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531442175


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +160,292 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, 0, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, keySchemaWithRangeScan.length + 1,
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName, RangeKeyScanStateEncoderType,
+          keySchemaWithRangeScan, numColsPrefixKey = 1, valueSchema)
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)

Review Comment:
   yea it was being covered in those set of inputs. But added some explicit 
ones as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to