Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
codope merged PR #12336: URL: https://github.com/apache/hudi/pull/12336 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12336: URL: https://github.com/apache/hudi/pull/12336#issuecomment-2501659498 ## CI report: * 647f20e3c4f377425c1cce7988491c3aef1b3746 Azure: [SUCCESS](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1977) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858971353 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -162,18 +164,49 @@ object RecordLevelIndexSupport { } var literals: List[String] = List.empty inQuery.list.foreach { - case literal: Literal => literals = literals :+ literal.value.toString + case literal: Literal => +val recordKeyLiteral = getRecordKeyLiteral(inQuery.value.asInstanceOf[AttributeReference], literal, isComplexRecordKey) +literals = literals :+ recordKeyLiteral case _ => validINQuery = false } if (validINQuery) { Option.apply(inQuery, literals) } else { Option.empty } + + // Handle And expression (composite filter) + case andQuery: And => +val leftResult = filterQueryWithRecordKey(andQuery.left, recordKeyOpt, isComplexRecordKey, attributeFetcher) +val rightResult = filterQueryWithRecordKey(andQuery.right, recordKeyOpt, isComplexRecordKey, attributeFetcher) + +// If both left and right filters are valid, concatenate their results +(leftResult, rightResult) match { + case (Some((leftExp, leftKeys)), Some((rightExp, rightKeys))) => +// Return concatenated expressions and record keys +Option.apply(And(leftExp, rightExp), leftKeys ++ rightKeys) Review Comment: This case is already handled. Test cases are added for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858966527 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala: ## @@ -155,32 +158,59 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, (List.empty, List.empty) } else { var recordKeyQueries: List[Expression] = List.empty - var recordKeys: List[String] = List.empty - for (query <- queryFilters) { -val recordKeyOpt = getRecordKeyConfig -RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ - case (exp: Expression, recKeys: List[String]) => -recordKeys = recordKeys ++ recKeys -recordKeyQueries = recordKeyQueries :+ exp -}) + var compositeRecordKeys: List[String] = List.empty + val recordKeyOpt = getRecordKeyConfig + val isComplexRecordKey = recordKeyOpt.map(recordKeys => recordKeys.length).getOrElse(0) > 1 + recordKeyOpt.foreach { recordKeysArray => +// Handle composite record keys +breakable { + for (recordKey <- recordKeysArray) { +var recordKeys: List[String] = List.empty +for (query <- queryFilters) { + { +RecordLevelIndexSupport.filterQueryWithRecordKey(query, Option.apply(recordKey), isComplexRecordKey).foreach { + case (exp: Expression, recKeys: List[String]) => +exp match { + // For IN, add each element individually to recordKeys + case _: In => recordKeys = recordKeys ++ recKeys + + // For other cases, basically EqualTo, concatenate recKeys with the default separator + case _ => recordKeys = recordKeys ++ recKeys +} +recordKeyQueries = recordKeyQueries :+ exp +} + } +} + +if (recordKeys.isEmpty) { + recordKeyQueries = List.empty + compositeRecordKeys = List.empty + break() +} else if (!isComplexRecordKey || compositeRecordKeys.isEmpty) { + compositeRecordKeys = recordKeys +} else { + var tempCompositeRecordKeys: List[String] = List.empty + for (compRecKey <- compositeRecordKeys) { Review Comment: Added test cases. These cases are handled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858969836 ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala: ## @@ -240,4 +269,63 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { val fileNames = rliIndexSupport.computeCandidateFileNames(fileIndex, Seq(dataFilter), null, prunedPaths, false) assertEquals(if (includeLogFiles) 2 else 1, fileNames.get.size) } + + @Test + def testRLIWithMultipleRecordKeyFields(): Unit = { +val tableName = "dummy_table_multi_pk" +val dummyTablePath = tempDir.resolve("dummy_table_multi_pk").toAbsolutePath.toString +val hudiOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> tableName, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "record_key_col,name", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_key_col", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" +) ++ metadataOpts + +spark.sql( + s""" + |create table $tableName ( + | record_key_col string, + | name string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col,name', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col,name', + | hoodie.enable.data.skipping = 'true' + | ) + | partitioned by(partition_key_col) + | location '$dummyTablePath' + """.stripMargin) +spark.sql(s"insert into $tableName values('row1', 'name1', 'a', 'p1')") +spark.sql(s"insert into $tableName values('row2', 'name2', 'b', 'p2')") +spark.sql(s"insert into $tableName values('row3', 'name3', 'c', 'p3')") + +val latestSnapshotDf = spark.read.format("hudi").options(hudiOpts).load(dummyTablePath) +this.metaClient = HoodieTableMetaClient.builder() + .setBasePath(dummyTablePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() +verifyEqualToQuery(hudiOpts, Seq("record_key_col", "name"), tableName, latestSnapshotDf, HoodieTableMetaClient.reload(metaClient)) +verifyInQuery(hudiOpts, Array("record_key_col", "name"), latestSnapshotDf, tableName, "partition_key_col", Seq("p2", "p3"), true) Review Comment: Added test cases for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858966026 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala: ## @@ -92,7 +92,7 @@ class FunctionalIndexSupport(spark: SparkSession, def filterQueriesWithFunctionalFilterKey(queryFilters: Seq[Expression], sourceFieldOpt: Option[String]): List[Tuple2[Expression, List[String]]] = { var functionalIndexQueries: List[Tuple2[Expression, List[String]]] = List.empty for (query <- queryFilters) { - filterQueryWithRecordKey(query, sourceFieldOpt, (expr: Expression) => { + filterQueryWithRecordKey(query, sourceFieldOpt, false, (expr: Expression) => { Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12336: URL: https://github.com/apache/hudi/pull/12336#issuecomment-2501520714 ## CI report: * 885e80032a81b10256e258dca03f4a10b6580858 Azure: [SUCCESS](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1965) * 647f20e3c4f377425c1cce7988491c3aef1b3746 Azure: [PENDING](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1977) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12336: URL: https://github.com/apache/hudi/pull/12336#issuecomment-2501516506 ## CI report: * 885e80032a81b10256e258dca03f4a10b6580858 Azure: [SUCCESS](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1965) * 647f20e3c4f377425c1cce7988491c3aef1b3746 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
codope commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r185853 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -162,18 +164,49 @@ object RecordLevelIndexSupport { } var literals: List[String] = List.empty inQuery.list.foreach { - case literal: Literal => literals = literals :+ literal.value.toString + case literal: Literal => +val recordKeyLiteral = getRecordKeyLiteral(inQuery.value.asInstanceOf[AttributeReference], literal, isComplexRecordKey) +literals = literals :+ recordKeyLiteral case _ => validINQuery = false } if (validINQuery) { Option.apply(inQuery, literals) } else { Option.empty } + + // Handle And expression (composite filter) + case andQuery: And => +val leftResult = filterQueryWithRecordKey(andQuery.left, recordKeyOpt, isComplexRecordKey, attributeFetcher) +val rightResult = filterQueryWithRecordKey(andQuery.right, recordKeyOpt, isComplexRecordKey, attributeFetcher) + +// If both left and right filters are valid, concatenate their results +(leftResult, rightResult) match { + case (Some((leftExp, leftKeys)), Some((rightExp, rightKeys))) => +// Return concatenated expressions and record keys +Option.apply(And(leftExp, rightExp), leftKeys ++ rightKeys) Review Comment: How are we checking the order of the keys? For examples, table config has keys in the following order : `recordKeyField1, recordKeyField2` while the query filter is `recordKeyField2='key2' AND re ordKeyField1='key1`. In that case, do we lookup RLI with key as `key2key1` or `key1key2`. Note that RLI will have keys in same order as keygen which has same order as fields in table config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858243803 ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala: ## @@ -240,4 +269,63 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { val fileNames = rliIndexSupport.computeCandidateFileNames(fileIndex, Seq(dataFilter), null, prunedPaths, false) assertEquals(if (includeLogFiles) 2 else 1, fileNames.get.size) } + + @Test + def testRLIWithMultipleRecordKeyFields(): Unit = { +val tableName = "dummy_table_multi_pk" +val dummyTablePath = tempDir.resolve("dummy_table_multi_pk").toAbsolutePath.toString +val hudiOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> tableName, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "record_key_col,name", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_key_col", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" +) ++ metadataOpts + +spark.sql( + s""" + |create table $tableName ( + | record_key_col string, + | name string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col,name', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col,name', + | hoodie.enable.data.skipping = 'true' + | ) + | partitioned by(partition_key_col) + | location '$dummyTablePath' + """.stripMargin) +spark.sql(s"insert into $tableName values('row1', 'name1', 'a', 'p1')") +spark.sql(s"insert into $tableName values('row2', 'name2', 'b', 'p2')") +spark.sql(s"insert into $tableName values('row3', 'name3', 'c', 'p3')") + +val latestSnapshotDf = spark.read.format("hudi").options(hudiOpts).load(dummyTablePath) +this.metaClient = HoodieTableMetaClient.builder() + .setBasePath(dummyTablePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() +verifyEqualToQuery(hudiOpts, Seq("record_key_col", "name"), tableName, latestSnapshotDf, HoodieTableMetaClient.reload(metaClient)) +verifyInQuery(hudiOpts, Array("record_key_col", "name"), latestSnapshotDf, tableName, "partition_key_col", Seq("p2", "p3"), true) Review Comment: Need to validate the `AND` with different predicates sequence: - `record_key_col = k1 and name = n1` - `name = n1 and record_key_col = k1`. Need to validate the `IN` with different predicates sequence: - `record_key_col in (k1, k2) and name in (n1, n2)` - `name in (n1, n2) and record_key_col in (k1, k2)` . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858236015 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala: ## @@ -155,32 +158,59 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, (List.empty, List.empty) } else { var recordKeyQueries: List[Expression] = List.empty - var recordKeys: List[String] = List.empty - for (query <- queryFilters) { -val recordKeyOpt = getRecordKeyConfig -RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ - case (exp: Expression, recKeys: List[String]) => -recordKeys = recordKeys ++ recKeys -recordKeyQueries = recordKeyQueries :+ exp -}) + var compositeRecordKeys: List[String] = List.empty + val recordKeyOpt = getRecordKeyConfig + val isComplexRecordKey = recordKeyOpt.map(recordKeys => recordKeys.length).getOrElse(0) > 1 + recordKeyOpt.foreach { recordKeysArray => +// Handle composite record keys +breakable { + for (recordKey <- recordKeysArray) { +var recordKeys: List[String] = List.empty +for (query <- queryFilters) { + { +RecordLevelIndexSupport.filterQueryWithRecordKey(query, Option.apply(recordKey), isComplexRecordKey).foreach { + case (exp: Expression, recKeys: List[String]) => +exp match { + // For IN, add each element individually to recordKeys + case _: In => recordKeys = recordKeys ++ recKeys + + // For other cases, basically EqualTo, concatenate recKeys with the default separator + case _ => recordKeys = recordKeys ++ recKeys +} +recordKeyQueries = recordKeyQueries :+ exp +} + } +} + +if (recordKeys.isEmpty) { + recordKeyQueries = List.empty + compositeRecordKeys = List.empty + break() +} else if (!isComplexRecordKey || compositeRecordKeys.isEmpty) { + compositeRecordKeys = recordKeys +} else { + var tempCompositeRecordKeys: List[String] = List.empty + for (compRecKey <- compositeRecordKeys) { Review Comment: Still we need to check all the fields of the composition records keys are involved in the query predicates, for e.g. if we have `col_0,col_1,col_2` as the record key fields, then the query predicate `col_0 = val_0 and col_1 = val_1` does not work? Only `col_0 = val_0 and col_1 = val_1 and col_2 = val_2` should work if we do not matches the MDT RLI with prefix search of the keys. Also we should be very careful with the field sequence in the returned `compositeRecordKeys`, if `col_0,col_1,col_2` are the record key fields, then literal `col_0:val_0,col_2:val_2,col_1:val_1` does not work either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858236015 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala: ## @@ -155,32 +158,59 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, (List.empty, List.empty) } else { var recordKeyQueries: List[Expression] = List.empty - var recordKeys: List[String] = List.empty - for (query <- queryFilters) { -val recordKeyOpt = getRecordKeyConfig -RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ - case (exp: Expression, recKeys: List[String]) => -recordKeys = recordKeys ++ recKeys -recordKeyQueries = recordKeyQueries :+ exp -}) + var compositeRecordKeys: List[String] = List.empty + val recordKeyOpt = getRecordKeyConfig + val isComplexRecordKey = recordKeyOpt.map(recordKeys => recordKeys.length).getOrElse(0) > 1 + recordKeyOpt.foreach { recordKeysArray => +// Handle composite record keys +breakable { + for (recordKey <- recordKeysArray) { +var recordKeys: List[String] = List.empty +for (query <- queryFilters) { + { +RecordLevelIndexSupport.filterQueryWithRecordKey(query, Option.apply(recordKey), isComplexRecordKey).foreach { + case (exp: Expression, recKeys: List[String]) => +exp match { + // For IN, add each element individually to recordKeys + case _: In => recordKeys = recordKeys ++ recKeys + + // For other cases, basically EqualTo, concatenate recKeys with the default separator + case _ => recordKeys = recordKeys ++ recKeys +} +recordKeyQueries = recordKeyQueries :+ exp +} + } +} + +if (recordKeys.isEmpty) { + recordKeyQueries = List.empty + compositeRecordKeys = List.empty + break() +} else if (!isComplexRecordKey || compositeRecordKeys.isEmpty) { + compositeRecordKeys = recordKeys +} else { + var tempCompositeRecordKeys: List[String] = List.empty + for (compRecKey <- compositeRecordKeys) { Review Comment: Still we need to check all the fields of the composition records keys are involved in the query predicates, for e.g. if we have `col_0,col_1,col_2` as the record key fields, then the query predicate `col_0 = val_0 and col_1 = val_1` does not work? Only `col_0 = val_0 and col_1 = val_1 and col_2 = val_2` should work if we do not matches the MDT RLI with prefix search of the keys. (need to add UT to confirm this part.) Also we should be very careful with the field sequence in the returned `compositeRecordKeys`, if `col_0,col_1,col_2` are the record key fields, then literal `col_0:val_0,col_2:val_2,col_1:val_1` does not work either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858227779 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala: ## @@ -155,32 +158,59 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, (List.empty, List.empty) } else { var recordKeyQueries: List[Expression] = List.empty - var recordKeys: List[String] = List.empty - for (query <- queryFilters) { -val recordKeyOpt = getRecordKeyConfig -RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ - case (exp: Expression, recKeys: List[String]) => -recordKeys = recordKeys ++ recKeys -recordKeyQueries = recordKeyQueries :+ exp -}) + var compositeRecordKeys: List[String] = List.empty + val recordKeyOpt = getRecordKeyConfig + val isComplexRecordKey = recordKeyOpt.map(recordKeys => recordKeys.length).getOrElse(0) > 1 + recordKeyOpt.foreach { recordKeysArray => +// Handle composite record keys +breakable { + for (recordKey <- recordKeysArray) { +var recordKeys: List[String] = List.empty +for (query <- queryFilters) { + { +RecordLevelIndexSupport.filterQueryWithRecordKey(query, Option.apply(recordKey), isComplexRecordKey).foreach { + case (exp: Expression, recKeys: List[String]) => +exp match { + // For IN, add each element individually to recordKeys + case _: In => recordKeys = recordKeys ++ recKeys + + // For other cases, basically EqualTo, concatenate recKeys with the default separator + case _ => recordKeys = recordKeys ++ recKeys +} +recordKeyQueries = recordKeyQueries :+ exp +} + } +} + +if (recordKeys.isEmpty) { + recordKeyQueries = List.empty + compositeRecordKeys = List.empty + break() +} else if (!isComplexRecordKey || compositeRecordKeys.isEmpty) { + compositeRecordKeys = recordKeys +} else { + var tempCompositeRecordKeys: List[String] = List.empty + for (compRecKey <- compositeRecordKeys) { Review Comment: This is for "Cartesian product" for m * n pattern: where m and n are enumerations of the possible values for a record key field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12336: URL: https://github.com/apache/hudi/pull/12336#discussion_r1858225267 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala: ## @@ -92,7 +92,7 @@ class FunctionalIndexSupport(spark: SparkSession, def filterQueriesWithFunctionalFilterKey(queryFilters: Seq[Expression], sourceFieldOpt: Option[String]): List[Tuple2[Expression, List[String]]] = { var functionalIndexQueries: List[Tuple2[Expression, List[String]]] = List.empty for (query <- queryFilters) { - filterQueryWithRecordKey(query, sourceFieldOpt, (expr: Expression) => { + filterQueryWithRecordKey(query, sourceFieldOpt, false, (expr: Expression) => { Review Comment: why hard code as false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
codope commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2500209664 Closing this PR in favor of #12336 The update PR has all the comments addressed from this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
codope closed pull request #12160: [HUDI-8432] Fix data skipping with RLI if record key is composite URL: https://github.com/apache/hudi/pull/12160 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12336: URL: https://github.com/apache/hudi/pull/12336#issuecomment-2499983965 ## CI report: * 885e80032a81b10256e258dca03f4a10b6580858 Azure: [SUCCESS](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1965) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12336: URL: https://github.com/apache/hudi/pull/12336#issuecomment-2499853822 ## CI report: * 885e80032a81b10256e258dca03f4a10b6580858 Azure: [PENDING](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1965) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2499851704 @nsivabalan @danny0405 I have pushed a new PR with comments addressed https://github.com/apache/hudi/pull/12336. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1857938535 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: Addressed ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyFields: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyFields.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyFields)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) +Option.apply(equalToQuery, List.apply(recordKeyLiteral)) } else { Option.empty } } else { Option.empty } + // Handle In expressions case inQuery: In => var validINQuery = true inQuery.value match { case attribute: AttributeReference => -if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { +if (!attributeMatchesRecordKey(attribute.name, recordKeyFields)) { validINQuery = false } case _ => validINQuery = false } var literals: List[String] = List.empty inQuery.list.foreach { - case literal: Literal => literals = literals :+ literal.value.toString + case literal: Literal => literals = literals :+ getRecordKeyLiteral(inQuery.value.asInstanceOf[AttributeReference], literal, isComplexRecordKey) case _ => validINQuery = false } if (validINQuery) { Option.apply(inQuery, literals) } else { Option.empty } + + // Handle And expression (composite filter) + case andQuery: And => +val leftResult = filterQueryWithR
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
lokeshj1703 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1857938763 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala: ## @@ -164,29 +164,34 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, var recordKeys: List[String] = List.empty for (query <- queryFilters) { val recordKeyOpt = getRecordKeyConfig -RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ - case (exp: Expression, recKeys: List[String]) => -recordKeys = recordKeys ++ recKeys -recordKeyQueries = recordKeyQueries :+ exp -}) +recordKeyOpt.foreach { recordKeysArray => + // Handle composite record keys + RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeysArray).foreach { +case (exp: Expression, recKeys: List[String]) => + exp match { +// For IN, add each element individually to recordKeys +case _: In => + recordKeys = recordKeys ++ recKeys + +// For other cases, basically EqualTo, concatenate recKeys with the default separator +case _ => + recordKeys = recordKeys :+ recKeys.mkString(DEFAULT_RECORD_KEY_PARTS_SEPARATOR) + } + recordKeyQueries = recordKeyQueries :+ exp + } +} } - - Tuple2.apply(recordKeyQueries, recordKeys) + (recordKeyQueries, recordKeys) } } /** - * Returns the configured record key for the table if it is a simple record key else returns empty option. + * Returns the configured record key for the table. */ - private def getRecordKeyConfig: Option[String] = { + private def getRecordKeyConfig: Option[Array[String]] = { val recordKeysOpt: org.apache.hudi.common.util.Option[Array[String]] = metaClient.getTableConfig.getRecordKeyFields -val recordKeyOpt = recordKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr => - if (arr.length == 1) { -arr(0) - } else { -null - })) -Option.apply(recordKeyOpt.orElse(null)) +// Convert the Hudi Option to Scala Option and return if present +Option(recordKeysOpt.orElse(null)).filter(_.nonEmpty) Review Comment: It does work. Filter would only work on Some option type. ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala: ## @@ -240,4 +251,61 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { val fileNames = rliIndexSupport.computeCandidateFileNames(fileIndex, Seq(dataFilter), null, prunedPaths, false) assertEquals(if (includeLogFiles) 2 else 1, fileNames.get.size) } + + @Test + def testRLIWithMultipleRecordKeyFields(): Unit = { +val tableName = "dummy_table_multi_pk" +val dummyTablePath = tempDir.resolve("dummy_table_multi_pk").toAbsolutePath.toString +val hudiOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> tableName, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "record_key_col,name", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_key_col", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" +) ++ metadataOpts + +spark.sql( + s""" + |create table $tableName ( + | record_key_col string, + | name string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col,name', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col,name', + | hoodie.enable.data.skipping = 'true' + | ) + | partitioned by(partition_key_col) + | location '$dummyTablePath' + """.stripMargin) +spark.sql(s"insert into $tableName values('row1', 'name1', 'a', 'p1')") +spark.sql(s"insert into $tableName values('row2', 'name2', 'b', 'p2')") +spark.sql(s"insert into $tableName values('row3', 'name3', 'c', 'p3')") + +val latestSnapshotDf = spark.read.format("hudi").options(hudiOpts).load(dummyTablePath) +this.metaClient = HoodieTableMetaClient.builder() + .setBasePath(dummyTablePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() +verifyEqualToQuery(hudiOpts, Seq("record_key_col", "name"), tableName, latestSnapshotDf, HoodieTableMetaClient.reload(metaClient)) + } + + def verifyEqualToQuery(hudiOpts: Map[String, String], colNames: Seq[String], tableName: String, latestSnapshotDf: DataFrame, metaClient: HoodieTableMetaClient): Unit = { Review Comment: Addressed -
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12336: URL: https://github.com/apache/hudi/pull/12336#issuecomment-2499850725 ## CI report: * 885e80032a81b10256e258dca03f4a10b6580858 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1853295109 ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala: ## @@ -240,4 +251,61 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { val fileNames = rliIndexSupport.computeCandidateFileNames(fileIndex, Seq(dataFilter), null, prunedPaths, false) assertEquals(if (includeLogFiles) 2 else 1, fileNames.get.size) } + + @Test + def testRLIWithMultipleRecordKeyFields(): Unit = { +val tableName = "dummy_table_multi_pk" +val dummyTablePath = tempDir.resolve("dummy_table_multi_pk").toAbsolutePath.toString +val hudiOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> tableName, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "record_key_col,name", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_key_col", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" +) ++ metadataOpts + +spark.sql( + s""" + |create table $tableName ( + | record_key_col string, + | name string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col,name', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col,name', + | hoodie.enable.data.skipping = 'true' + | ) + | partitioned by(partition_key_col) + | location '$dummyTablePath' + """.stripMargin) +spark.sql(s"insert into $tableName values('row1', 'name1', 'a', 'p1')") +spark.sql(s"insert into $tableName values('row2', 'name2', 'b', 'p2')") +spark.sql(s"insert into $tableName values('row3', 'name3', 'c', 'p3')") + +val latestSnapshotDf = spark.read.format("hudi").options(hudiOpts).load(dummyTablePath) +this.metaClient = HoodieTableMetaClient.builder() + .setBasePath(dummyTablePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() +verifyEqualToQuery(hudiOpts, Seq("record_key_col", "name"), tableName, latestSnapshotDf, HoodieTableMetaClient.reload(metaClient)) + } + + def verifyEqualToQuery(hudiOpts: Map[String, String], colNames: Seq[String], tableName: String, latestSnapshotDf: DataFrame, metaClient: HoodieTableMetaClient): Unit = { Review Comment: also valite `IN` and `AND`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1853294752 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala: ## @@ -164,29 +164,34 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, var recordKeys: List[String] = List.empty for (query <- queryFilters) { val recordKeyOpt = getRecordKeyConfig -RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ - case (exp: Expression, recKeys: List[String]) => -recordKeys = recordKeys ++ recKeys -recordKeyQueries = recordKeyQueries :+ exp -}) +recordKeyOpt.foreach { recordKeysArray => + // Handle composite record keys + RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeysArray).foreach { +case (exp: Expression, recKeys: List[String]) => + exp match { +// For IN, add each element individually to recordKeys +case _: In => + recordKeys = recordKeys ++ recKeys + +// For other cases, basically EqualTo, concatenate recKeys with the default separator +case _ => + recordKeys = recordKeys :+ recKeys.mkString(DEFAULT_RECORD_KEY_PARTS_SEPARATOR) + } + recordKeyQueries = recordKeyQueries :+ exp + } +} } - - Tuple2.apply(recordKeyQueries, recordKeys) + (recordKeyQueries, recordKeys) } } /** - * Returns the configured record key for the table if it is a simple record key else returns empty option. + * Returns the configured record key for the table. */ - private def getRecordKeyConfig: Option[String] = { + private def getRecordKeyConfig: Option[Array[String]] = { val recordKeysOpt: org.apache.hudi.common.util.Option[Array[String]] = metaClient.getTableConfig.getRecordKeyFields -val recordKeyOpt = recordKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr => - if (arr.length == 1) { -arr(0) - } else { -null - })) -Option.apply(recordKeyOpt.orElse(null)) +// Convert the Hudi Option to Scala Option and return if present +Option(recordKeysOpt.orElse(null)).filter(_.nonEmpty) Review Comment: The `nonEmpty` does now work when the record key is null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1853294309 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyFields: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyFields.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyFields)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) +Option.apply(equalToQuery, List.apply(recordKeyLiteral)) } else { Option.empty } } else { Option.empty } + // Handle In expressions case inQuery: In => var validINQuery = true inQuery.value match { case attribute: AttributeReference => -if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { +if (!attributeMatchesRecordKey(attribute.name, recordKeyFields)) { validINQuery = false } case _ => validINQuery = false } var literals: List[String] = List.empty inQuery.list.foreach { - case literal: Literal => literals = literals :+ literal.value.toString + case literal: Literal => literals = literals :+ getRecordKeyLiteral(inQuery.value.asInstanceOf[AttributeReference], literal, isComplexRecordKey) Review Comment: We need to check all the record key fields are got involved. ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: We need to check all the record key fields are got involved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1853293898 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyFields: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyFields.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyFields)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) +Option.apply(equalToQuery, List.apply(recordKeyLiteral)) } else { Option.empty } } else { Option.empty } + // Handle In expressions case inQuery: In => var validINQuery = true inQuery.value match { case attribute: AttributeReference => -if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { +if (!attributeMatchesRecordKey(attribute.name, recordKeyFields)) { validINQuery = false } case _ => validINQuery = false } var literals: List[String] = List.empty inQuery.list.foreach { - case literal: Literal => literals = literals :+ literal.value.toString + case literal: Literal => literals = literals :+ getRecordKeyLiteral(inQuery.value.asInstanceOf[AttributeReference], literal, isComplexRecordKey) case _ => validINQuery = false } if (validINQuery) { Option.apply(inQuery, literals) } else { Option.empty } + + // Handle And expression (composite filter) + case andQuery: And => +val leftResult = filterQueryWithRecordKey(andQuery.left, recordKeyFields) +val rightResult = filterQueryWithRecordKey(andQuery.right, recordKeyFields) + +// If both left and right filters are valid, concatenate their results +(leftResult, rightResult) match { + case (Some((leftExp, leftKeys)), Some((rightExp, rightKeys))) => +// Return concatenated expressions and record keys +Option.apply(And(leftExp, rightExp), leftKeys ++ rightKeys) + case _ => Option.empty +} + + // Handle Or expression (for completeness, though usually not used for record key filtering) + case orQuery: Or => Review Comment: I guess the `OR` does not work as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
codope commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1853213011 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: We don't support range queries with RLI, so it will fallback to no data skipping (exxcept for partition pruning which is done before we read RLI). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
nsivabalan commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1853209132 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: yes. - lets do RLI look up only when all fields in complex key gen are part of the predicates. - If I am not wrong, we already only support "IN" and "EQUAL"s. So, not sure what Danny is bringing up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1817566167 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: > For now, we can just see if all record key fields are present or not. If not, then don't use RLI. What do you think? I kind of think we should also restrain all the expression type to `EQUALS`. For e.g, we have record key `col_0,col_1` and expression `col_0 >0 && col_1 <100`, you can not utilitize RLI in sucn use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2437622149 ## CI report: * cbeb41bdd47f5cc0064d95002885ad03b788718e Azure: [FAILURE](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1398) * 2ecc9e7fa8e65aad7b919b2ae0a4f6b9deaf129b UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2438006743 ## CI report: * b20398a1a5e45de9eadf8ef73d667e3469479d86 Azure: [SUCCESS](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1417) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2437752888 ## CI report: * 2ecc9e7fa8e65aad7b919b2ae0a4f6b9deaf129b Azure: [CANCELED](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1416) * b20398a1a5e45de9eadf8ef73d667e3469479d86 Azure: [PENDING](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1417) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2437749413 ## CI report: * cbeb41bdd47f5cc0064d95002885ad03b788718e Azure: [FAILURE](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1398) * 2ecc9e7fa8e65aad7b919b2ae0a4f6b9deaf129b Azure: [PENDING](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1416) * b20398a1a5e45de9eadf8ef73d667e3469479d86 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2437625001 ## CI report: * cbeb41bdd47f5cc0064d95002885ad03b788718e Azure: [FAILURE](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1398) * 2ecc9e7fa8e65aad7b919b2ae0a4f6b9deaf129b Azure: [PENDING](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1416) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
codope commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1816572972 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: good point.. we should check if the predicate contains all of record key fields. Technically, we can support subset of record key fields as well but that involves pulling all records in RLI and checking for subset match. Something to followup. For now, we can just see if all record key fields are present or not. If not, then don't use RLI. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2436007861 ## CI report: * cbeb41bdd47f5cc0064d95002885ad03b788718e Azure: [PENDING](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1398) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
danny0405 commented on code in PR #12160: URL: https://github.com/apache/hudi/pull/12160#discussion_r1815867538 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala: ## @@ -122,47 +123,78 @@ object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" /** - * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of + * If the input query is an EqualTo or IN query on record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Array[String]): Option[(Expression, List[String])] = { +val isComplexRecordKey = recordKeyOpt.length > 1 queryFilter match { + // Handle EqualTo expressions case equalToQuery: EqualTo => val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull if (attributeLiteralTuple != null) { val attribute = attributeLiteralTuple._1 val literal = attributeLiteralTuple._2 - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { -Option.apply(equalToQuery, List.apply(literal.value.toString)) + if (attribute != null && attribute.name != null && recordKeyOpt.contains(attribute.name)) { +val recordKeyLiteral = getRecordKeyLiteral(attribute, literal, isComplexRecordKey) Review Comment: How can we ensure all the consitituent record key values got involved in the expression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2436157635 ## CI report: * cbeb41bdd47f5cc0064d95002885ad03b788718e Azure: [FAILURE](https://dev.azure.com/apachehudi/a1a51da7-8592-47d4-88dc-fd67bed336bb/_build/results?buildId=1398) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-8432] Fix data skipping with RLI if record key is composite [hudi]
hudi-bot commented on PR #12160: URL: https://github.com/apache/hudi/pull/12160#issuecomment-2436003548 ## CI report: * cbeb41bdd47f5cc0064d95002885ad03b788718e UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org