anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1534395668


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +161,360 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 
keySchemaWithRangeScan.length + 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - null type columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithNullTypeCols: StructType = StructType(
+      Seq(StructField("key1", NullType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithNullTypeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - 
variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 
68), (90L, 2000),
+        (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), 
(9L, 95118), (6L, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan byte ordering column - variable 
size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", ByteType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), 
(0x1F, 1), (0x01, 68),
+        (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 
2112),
+        (0x06, 90118), (0x09, 95118), (0x06, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by byte col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - ordering cols and key schema 
cols are same",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    // use the same schema as value schema for single col key schema
+    tryWithProviderResource(newStoreProvider(valueSchema,
+      RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          valueSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(valueSchema, 1))
+      }
+
+      val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 
6, 9, 5)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToValueRow(ts)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        valueRowToData(kv.key)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+
+      // also check for prefix scan
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToValueRow(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          valueRowToData(kv.key)
+        }.toSeq
+        assert(result.size === 1)
+      }
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - with prefix scan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 1L)
+      timerTimestamps.foreach { ts =>
+        (1 to 5).foreach { keyVal =>
+          val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString)
+          val valueRow = dataToValueRow(1)
+          store.put(keyRow, valueRow, cfName)
+          assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+        }
+      }
+
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToPrefixKeyRowWithRangeScan(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          val key = keyRowWithRangeScanToData(kv.key)
+          key._2
+        }.toSeq
+        assert(result.size === 5)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to