HeartSaVioR commented on code in PR #49747:
URL: https://github.com/apache/spark/pull/49747#discussion_r1936722556
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ Seq(true, false).foreach { colFamiliesEnabled =>
+ test(s"rocksdb range scan - variable size non-ordering columns with
non-zero start ordinal " +
+ s"with colFamiliesEnabled=$colFamiliesEnabled") {
+
+ tryWithProviderResource(newStoreProvider(keySchema,
+ RangeKeyScanStateEncoderSpec(
+ keySchema, Seq(1)), colFamiliesEnabled)) { provider =>
+ val store = provider.getStore(0)
+
+ // use non-default col family if column families are enabled
+ val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+ if (colFamiliesEnabled) {
+ store.createColFamilyIfAbsent(cfName,
+ keySchema, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchema, Seq(1)))
+ }
+
+ val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L,
2L, 8L,
Review Comment:
nit: Is it intentional to use Long constants here while we cast to Int again?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ Seq(true, false).foreach { colFamiliesEnabled =>
+ test(s"rocksdb range scan - variable size non-ordering columns with
non-zero start ordinal " +
+ s"with colFamiliesEnabled=$colFamiliesEnabled") {
+
+ tryWithProviderResource(newStoreProvider(keySchema,
+ RangeKeyScanStateEncoderSpec(
+ keySchema, Seq(1)), colFamiliesEnabled)) { provider =>
+ val store = provider.getStore(0)
+
+ // use non-default col family if column families are enabled
+ val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+ if (colFamiliesEnabled) {
+ store.createColFamilyIfAbsent(cfName,
+ keySchema, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchema, Seq(1)))
+ }
+
+ val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L,
2L, 8L,
+ -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L)
+ timerTimestamps.foreach { ts =>
+ val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter)
Review Comment:
nit: Looks like this could be easily extracted out to the inner method to
avoid being redundant. Either getting ts or Array/List of ts.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -453,6 +510,68 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ Seq(true, false).foreach { colFamiliesEnabled =>
+ Seq(Seq(1, 2), Seq(2, 1)).foreach { sortIndexes =>
+ test(s"rocksdb range scan multiple ordering columns - with non-zero
start ordinal - " +
+ s"variable size non-ordering columns with
colFamiliesEnabled=$colFamiliesEnabled " +
+ s"sortIndexes=${sortIndexes.mkString(",")}") {
+
+ val testSchema: StructType = StructType(
+ Seq(StructField("key1", StringType, false),
+ StructField("key2", LongType, false),
+ StructField("key3", IntegerType, false)))
+
+ val schemaProj = UnsafeProjection.create(Array[DataType](StringType,
LongType, IntegerType))
+
+ tryWithProviderResource(newStoreProvider(testSchema,
+ RangeKeyScanStateEncoderSpec(testSchema, sortIndexes),
colFamiliesEnabled)) { provider =>
+ val store = provider.getStore(0)
+
+ val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+ if (colFamiliesEnabled) {
+ store.createColFamilyIfAbsent(cfName,
+ testSchema, valueSchema,
+ RangeKeyScanStateEncoderSpec(testSchema, sortIndexes))
+ }
+
+ val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1),
(4200L, 68), (90L, 2000),
+ (1L, 27), (1L, 394), (1L, 5), (3L, 980),
+ (-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1),
(-1020L, 2),
+ (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L,
2323), (-3122L, 323))
+ timerTimestamps.foreach { ts =>
+ // order by long col first and then by int col
+ val keyRow = schemaProj.apply(new
GenericInternalRow(Array[Any](UTF8String
+ .fromString(Random.alphanumeric.take(Random.nextInt(20) +
1).mkString), ts._1,
+ ts._2)))
+ val valueRow = dataToValueRow(1)
+ store.put(keyRow, valueRow, cfName)
+ assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+ }
+
+ val result = store.iterator(cfName).map { kv =>
+ val keyRow = kv.key
+ val key = (keyRow.getString(0), keyRow.getLong(1),
keyRow.getInt(2))
+ (key._2, key._3)
Review Comment:
nit: why not just inline?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
assert(exception.getMessage.contains("Found long, expecting union"))
}
- testWithColumnFamiliesAndEncodingTypes(
- "rocksdb range scan multiple non-contiguous ordering columns",
- TestWithBothChangelogCheckpointingEnabledAndDisabled ) {
colFamiliesEnabled =>
- val testSchema: StructType = StructType(
- Seq(
- StructField("ordering1", LongType, false),
- StructField("key2", StringType, false),
- StructField("ordering2", IntegerType, false),
- StructField("string2", StringType, false),
- StructField("ordering3", DoubleType, false)
+ Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach {
sortIndexes =>
+ testWithColumnFamiliesAndEncodingTypes(
+ s"rocksdb range scan multiple non-contiguous ordering columns " +
+ s"and sortIndexes=${sortIndexes.mkString(",")}",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) {
colFamiliesEnabled =>
+ val testSchema: StructType = StructType(
+ Seq(
+ StructField("ordering1", LongType, false),
+ StructField("key2", StringType, false),
+ StructField("ordering2", IntegerType, false),
+ StructField("string2", StringType, false),
+ StructField("ordering3", DoubleType, false)
+ )
)
- )
- val testSchemaProj = UnsafeProjection.create(Array[DataType](
+ val testSchemaProj = UnsafeProjection.create(Array[DataType](
immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)):
_*))
- val rangeScanOrdinals = Seq(0, 2, 4)
-
- tryWithProviderResource(
- newStoreProvider(
- testSchema,
- RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
- colFamiliesEnabled
- )
- ) { provider =>
- val store = provider.getStore(0)
+ // Multiply by 2 to get the actual ordinals in the row
Review Comment:
nit: it does not seem to simplify the logic. (0, 1, 2) * 2 vs (0, 2, 4) does
not have a huge difference given that someone needs to know about the schema to
understand this.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
assert(exception.getMessage.contains("Found long, expecting union"))
}
- testWithColumnFamiliesAndEncodingTypes(
- "rocksdb range scan multiple non-contiguous ordering columns",
- TestWithBothChangelogCheckpointingEnabledAndDisabled ) {
colFamiliesEnabled =>
- val testSchema: StructType = StructType(
- Seq(
- StructField("ordering1", LongType, false),
- StructField("key2", StringType, false),
- StructField("ordering2", IntegerType, false),
- StructField("string2", StringType, false),
- StructField("ordering3", DoubleType, false)
+ Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach {
sortIndexes =>
+ testWithColumnFamiliesAndEncodingTypes(
+ s"rocksdb range scan multiple non-contiguous ordering columns " +
+ s"and sortIndexes=${sortIndexes.mkString(",")}",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) {
colFamiliesEnabled =>
+ val testSchema: StructType = StructType(
+ Seq(
+ StructField("ordering1", LongType, false),
+ StructField("key2", StringType, false),
+ StructField("ordering2", IntegerType, false),
+ StructField("string2", StringType, false),
+ StructField("ordering3", DoubleType, false)
+ )
)
- )
- val testSchemaProj = UnsafeProjection.create(Array[DataType](
+ val testSchemaProj = UnsafeProjection.create(Array[DataType](
immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)):
_*))
- val rangeScanOrdinals = Seq(0, 2, 4)
-
- tryWithProviderResource(
- newStoreProvider(
- testSchema,
- RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
- colFamiliesEnabled
- )
- ) { provider =>
- val store = provider.getStore(0)
+ // Multiply by 2 to get the actual ordinals in the row
+ val rangeScanOrdinals = sortIndexes.map(_ * 2)
- val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
- if (colFamiliesEnabled) {
- store.createColFamilyIfAbsent(
- cfName,
+ tryWithProviderResource(
+ newStoreProvider(
testSchema,
- valueSchema,
- RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
+ RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
+ colFamiliesEnabled
)
- }
+ ) { provider =>
+ val store = provider.getStore(0)
- val orderedInput = Seq(
- // Make sure that the first column takes precedence, even if the
- // later columns are greater
- (-2L, 0, 99.0),
- (-1L, 0, 98.0),
- (0L, 0, 97.0),
- (2L, 0, 96.0),
- // Make sure that the second column takes precedence, when the first
- // column is all the same
- (3L, -2, -1.0),
- (3L, -1, -2.0),
- (3L, 0, -3.0),
- (3L, 2, -4.0),
- // Finally, make sure that the third column takes precedence, when the
- // first two ordering columns are the same.
- (4L, -1, -127.0),
- (4L, -1, 0.0),
- (4L, -1, 64.0),
- (4L, -1, 127.0)
- )
- val scrambledInput = Random.shuffle(orderedInput)
-
- scrambledInput.foreach { record =>
- val keyRow = testSchemaProj.apply(
- new GenericInternalRow(
- Array[Any](
- record._1,
-
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) +
1).mkString),
- record._2,
-
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) +
1).mkString),
- record._3
- )
+ val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+ if (colFamiliesEnabled) {
+ store.createColFamilyIfAbsent(
+ cfName,
+ testSchema,
+ valueSchema,
+ RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
)
+ }
+
+ val orderedInput = Seq(
+ // Make sure that the first column takes precedence, even if the
+ // later columns are greater
+ (-2L, 0, 99.0),
+ (-1L, 0, 98.0),
+ (0L, 0, 97.0),
+ (2L, 0, 96.0),
+ // Make sure that the second column takes precedence, when the first
+ // column is all the same
+ (3L, -2, -1.0),
+ (3L, -1, -2.0),
+ (3L, 0, -3.0),
+ (3L, 2, -4.0),
+ // Finally, make sure that the third column takes precedence, when
the
+ // first two ordering columns are the same.
+ (4L, -1, -127.0),
+ (4L, -1, 0.0),
+ (4L, -1, 64.0),
+ (4L, -1, 127.0)
)
+ val scrambledInput = Random.shuffle(orderedInput)
+
+ scrambledInput.foreach { record =>
+ val keyRow = testSchemaProj.apply(
+ new GenericInternalRow(
+ Array[Any](
+ record._1,
+
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) +
1).mkString),
+ record._2,
+
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) +
1).mkString),
+ record._3
+ )
+ )
+ )
- // The value is just a "dummy" value of 1
- val valueRow = dataToValueRow(1)
- store.put(keyRow, valueRow, cfName)
- assert(valueRowToData(store.get(keyRow, cfName)) === 1)
- }
+ // The value is just a "dummy" value of 1
+ val valueRow = dataToValueRow(1)
+ store.put(keyRow, valueRow, cfName)
+ assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+ }
- val result = store
- .iterator(cfName)
- .map { kv =>
- val keyRow = kv.key
- val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4))
- (key._1, key._2, key._3)
+ val result = store
+ .iterator(cfName)
+ .map { kv =>
+ val keyRow = kv.key
+ val key = (keyRow.getLong(0), keyRow.getInt(2),
keyRow.getDouble(4))
+ (key._1, key._2, key._3)
Review Comment:
ditto, not sure whether the readability will be regressed if we inline.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
assert(exception.getMessage.contains("Found long, expecting union"))
}
- testWithColumnFamiliesAndEncodingTypes(
- "rocksdb range scan multiple non-contiguous ordering columns",
- TestWithBothChangelogCheckpointingEnabledAndDisabled ) {
colFamiliesEnabled =>
- val testSchema: StructType = StructType(
- Seq(
- StructField("ordering1", LongType, false),
- StructField("key2", StringType, false),
- StructField("ordering2", IntegerType, false),
- StructField("string2", StringType, false),
- StructField("ordering3", DoubleType, false)
+ Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach {
sortIndexes =>
+ testWithColumnFamiliesAndEncodingTypes(
+ s"rocksdb range scan multiple non-contiguous ordering columns " +
+ s"and sortIndexes=${sortIndexes.mkString(",")}",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) {
colFamiliesEnabled =>
+ val testSchema: StructType = StructType(
+ Seq(
+ StructField("ordering1", LongType, false),
+ StructField("key2", StringType, false),
+ StructField("ordering2", IntegerType, false),
+ StructField("string2", StringType, false),
+ StructField("ordering3", DoubleType, false)
+ )
)
- )
- val testSchemaProj = UnsafeProjection.create(Array[DataType](
+ val testSchemaProj = UnsafeProjection.create(Array[DataType](
immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)):
_*))
- val rangeScanOrdinals = Seq(0, 2, 4)
-
- tryWithProviderResource(
- newStoreProvider(
- testSchema,
- RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
- colFamiliesEnabled
- )
- ) { provider =>
- val store = provider.getStore(0)
+ // Multiply by 2 to get the actual ordinals in the row
Review Comment:
I see how do you use this to simplify; effectively doing projection. Looks
OK to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]