Re: Record level index with not unique keys
Hi, yes the indexing DAG can support this today and even if not, it can be easily fixed. Main issue would be how we encode the mapping well. for e.g if we want map from user_id to all events that belong to the user, we need a different, scalable way of storing this mapping. I can organize this work under the 1.0 stream, if you are interested in driving. Thanks Vinoth On Thu, Jul 13, 2023 at 1:09 PM nicolas paris wrote: > Hello Prashant, thanks for your time. > > > > With non unique keys how would tagging of records (for updates / > deletes) work? > > Currently both GLOBAL_SIMPLE/BLOOM work out of the box in the mentioned > context. See below pyspark script and results. As for the > implementation, the tagLocationBacktoRecords returns a rdd of > HoodieRecord with (key/part/location), and it can contain duplicate > keys (then multiple records for same key). > > ``` > tableName = "test_global_bloom" > basePath = f"/tmp/{tableName}" > > hudi_options = { > "hoodie.table.name": tableName, > "hoodie.datasource.write.recordkey.field": "event_id", > "hoodie.datasource.write.partitionpath.field": "part", > "hoodie.datasource.write.table.name": tableName, > "hoodie.datasource.write.precombine.field": "ts", > "hoodie.datasource.write.hive_style_partitioning": "true", > "hoodie.datasource.hive_sync.enable": "false", > "hoodie.metadata.enable": "true", > "hoodie.index.type": "GLOBAL_BLOOM", # GLOBAL_SIMPLE works as well > } > > # LET'S GEN DUPLS > mode="overwrite" > df =spark.sql("""select '1' as event_id, '2' as ts, '2' as part UNION > select '1' as event_id, '3' as ts, '3' as part UNION > select '1' as event_id, '2' as ts, '3' as part UNION > select '2' as event_id, '2' as ts, '3' as part""") > df.write.format("hudi").options(**hudi_options).option("hoodie.datasour > ce.write.operation", "BULK_INSERT").mode(mode).save(basePath) > spark.read.format("hudi").load(basePath).select("event_id", > "ts","part").show() > # ++---++ > # |event_id| ts|part| > # ++---++ > # | 1| 3| 3| > # | 1| 2| 3| > # | 2| 2| 3| > # | 1| 2| 2| > # ++---++ > > # UPDATE > mode="append" > spark.sql("select '1' as event_id, '20' as ts, '4' as > part").write.format("hudi").options(**hudi_options).option("hoodie.data > source.write.operation", "UPSERT").mode(mode).save(basePath) > spark.read.format("hudi").load(basePath).select("event_id", > "ts","part").show() > # ++---++ > # |event_id| ts|part| > # ++---++ > # | 1| 20| 4| > # | 1| 20| 4| > # | 1| 20| 4| > # | 2| 2| 3| > # ++---++ > > # DELETE > mode="append" > spark.sql("select 1 as > event_id").write.format("hudi").options(**hudi_options).option("hoodie. > datasource.write.operation", "DELETE").mode(mode).save(basePath) > spark.read.format("hudi").load(basePath).select("event_id", > "ts","part").show() > # ++---++ > # |event_id| ts|part| > # ++---++ > # | 2| 2| 3| > # ++---++ > ``` > > > > How would record Index know which mapping of the array to > return for a given record key? > > As well as GLOBAL_SIMPLE/BLOOM, for a given record key, the RLI would > return a list of mapping. Then the operation (update, delete, FCOW ...) > would apply to each location. > > To illustrate, we could get something like this in the MDT: > > |event_id:1|[ > {part=2, -5811947225812876253, -6812062179961430298, 0, > 1689147210233}, > {part=3, -711947225812876253, -8812062179961430298, 1, > 1689147210233}, > {part=3, -1811947225812876253, -2812062179961430298, 0, > 1689147210233} > ]| > > > On Thu, 2023-07-13 at 10:17 -0700, Prashant Wason wrote: > > Hi Nicolas, > > > > The RI feature is designed for max performance as it is at a record- > > count > > scale. Hence, the schema is simplified and minimized. > > > > With non unique keys how would tagging of records (for updates / > > deletes) > > work? How would record Index know which mapping of the array to > > return for > > a given record key? > > > > Thanks > > Prashant > > > > > > > > On Wed, Jul 12, 2023 at 2:02 AM nicolas paris > > > > wrote: > > > > > hi there, > > > > > > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast > > > COW > > > (rfc-68) will be based on RLI to get the parquet offsets and allow > > > targeting parquet row groups. > > > > > > RLI is a global index, therefore it assumes the hudi key is present > > > in > > > at most one parquet file. As a result in the MDT, the RLI is of > > > type > > > struct, and there is a 1:1 mapping w/ a given file. > > > > > > Type: > > >|-- recordIndexMetadata: struct (nullable = true) > > >||-- partition: string (nullable = false) > > >||-- fileIdHighBits: long (nullable = false) > > >||-- fileIdLowBits: long (nullable = false) > > >|
Re: Record level index with not unique keys
Hello Prashant, thanks for your time. > With non unique keys how would tagging of records (for updates / deletes) work? Currently both GLOBAL_SIMPLE/BLOOM work out of the box in the mentioned context. See below pyspark script and results. As for the implementation, the tagLocationBacktoRecords returns a rdd of HoodieRecord with (key/part/location), and it can contain duplicate keys (then multiple records for same key). ``` tableName = "test_global_bloom" basePath = f"/tmp/{tableName}" hudi_options = { "hoodie.table.name": tableName, "hoodie.datasource.write.recordkey.field": "event_id", "hoodie.datasource.write.partitionpath.field": "part", "hoodie.datasource.write.table.name": tableName, "hoodie.datasource.write.precombine.field": "ts", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "false", "hoodie.metadata.enable": "true", "hoodie.index.type": "GLOBAL_BLOOM", # GLOBAL_SIMPLE works as well } # LET'S GEN DUPLS mode="overwrite" df =spark.sql("""select '1' as event_id, '2' as ts, '2' as part UNION select '1' as event_id, '3' as ts, '3' as part UNION select '1' as event_id, '2' as ts, '3' as part UNION select '2' as event_id, '2' as ts, '3' as part""") df.write.format("hudi").options(**hudi_options).option("hoodie.datasour ce.write.operation", "BULK_INSERT").mode(mode).save(basePath) spark.read.format("hudi").load(basePath).select("event_id", "ts","part").show() # ++---++ # |event_id| ts|part| # ++---++ # | 1| 3| 3| # | 1| 2| 3| # | 2| 2| 3| # | 1| 2| 2| # ++---++ # UPDATE mode="append" spark.sql("select '1' as event_id, '20' as ts, '4' as part").write.format("hudi").options(**hudi_options).option("hoodie.data source.write.operation", "UPSERT").mode(mode).save(basePath) spark.read.format("hudi").load(basePath).select("event_id", "ts","part").show() # ++---++ # |event_id| ts|part| # ++---++ # | 1| 20| 4| # | 1| 20| 4| # | 1| 20| 4| # | 2| 2| 3| # ++---++ # DELETE mode="append" spark.sql("select 1 as event_id").write.format("hudi").options(**hudi_options).option("hoodie. datasource.write.operation", "DELETE").mode(mode).save(basePath) spark.read.format("hudi").load(basePath).select("event_id", "ts","part").show() # ++---++ # |event_id| ts|part| # ++---++ # | 2| 2| 3| # ++---++ ``` > How would record Index know which mapping of the array to return for a given record key? As well as GLOBAL_SIMPLE/BLOOM, for a given record key, the RLI would return a list of mapping. Then the operation (update, delete, FCOW ...) would apply to each location. To illustrate, we could get something like this in the MDT: |event_id:1|[ {part=2, -5811947225812876253, -6812062179961430298, 0, 1689147210233}, {part=3, -711947225812876253, -8812062179961430298, 1, 1689147210233}, {part=3, -1811947225812876253, -2812062179961430298, 0, 1689147210233} ]| On Thu, 2023-07-13 at 10:17 -0700, Prashant Wason wrote: > Hi Nicolas, > > The RI feature is designed for max performance as it is at a record- > count > scale. Hence, the schema is simplified and minimized. > > With non unique keys how would tagging of records (for updates / > deletes) > work? How would record Index know which mapping of the array to > return for > a given record key? > > Thanks > Prashant > > > > On Wed, Jul 12, 2023 at 2:02 AM nicolas paris > > wrote: > > > hi there, > > > > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast > > COW > > (rfc-68) will be based on RLI to get the parquet offsets and allow > > targeting parquet row groups. > > > > RLI is a global index, therefore it assumes the hudi key is present > > in > > at most one parquet file. As a result in the MDT, the RLI is of > > type > > struct, and there is a 1:1 mapping w/ a given file. > > > > Type: > > |-- recordIndexMetadata: struct (nullable = true) > > | |-- partition: string (nullable = false) > > | |-- fileIdHighBits: long (nullable = false) > > | |-- fileIdLowBits: long (nullable = false) > > | |-- fileIndex: integer (nullable = false) > > | |-- instantTime: long (nullable = false) > > > > Content: > > |event_id:1 |{part=3, -6811947225812876253, > > -7812062179961430298, 0, 1689147210233}| > > > > We would love to use both RLI and FCOW features, but I'm afraid our > > keys are not unique in our kafka archives. Same key might be > > present > > in multiple partitions, and even in multiple slices within > > partitions. > > > > I wonder if the future, RLI could support multiple parquet files > > (by > > storing an array of struct for eg). This would enable to leverage > > LRI > > in more contexts > > > > Thx > > > > > > > > > >
Re: Record level index with not unique keys
Hi Nicolas, The RI feature is designed for max performance as it is at a record-count scale. Hence, the schema is simplified and minimized. With non unique keys how would tagging of records (for updates / deletes) work? How would record Index know which mapping of the array to return for a given record key? Thanks Prashant On Wed, Jul 12, 2023 at 2:02 AM nicolas paris wrote: > hi there, > > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast COW > (rfc-68) will be based on RLI to get the parquet offsets and allow > targeting parquet row groups. > > RLI is a global index, therefore it assumes the hudi key is present in > at most one parquet file. As a result in the MDT, the RLI is of type > struct, and there is a 1:1 mapping w/ a given file. > > Type: >|-- recordIndexMetadata: struct (nullable = true) >||-- partition: string (nullable = false) >||-- fileIdHighBits: long (nullable = false) >||-- fileIdLowBits: long (nullable = false) >||-- fileIndex: integer (nullable = false) >||-- instantTime: long (nullable = false) > > Content: >|event_id:1|{part=3, -6811947225812876253, > -7812062179961430298, 0, 1689147210233}| > > We would love to use both RLI and FCOW features, but I'm afraid our > keys are not unique in our kafka archives. Same key might be present > in multiple partitions, and even in multiple slices within partitions. > > I wonder if the future, RLI could support multiple parquet files (by > storing an array of struct for eg). This would enable to leverage LRI > in more contexts > > Thx > > > > >
Record level index with not unique keys
hi there, Just tested preview of RLI (rfc-08), amazing feature. Soon the fast COW (rfc-68) will be based on RLI to get the parquet offsets and allow targeting parquet row groups. RLI is a global index, therefore it assumes the hudi key is present in at most one parquet file. As a result in the MDT, the RLI is of type struct, and there is a 1:1 mapping w/ a given file. Type: |-- recordIndexMetadata: struct (nullable = true) ||-- partition: string (nullable = false) ||-- fileIdHighBits: long (nullable = false) ||-- fileIdLowBits: long (nullable = false) ||-- fileIndex: integer (nullable = false) ||-- instantTime: long (nullable = false) Content: |event_id:1|{part=3, -6811947225812876253, -7812062179961430298, 0, 1689147210233}| We would love to use both RLI and FCOW features, but I'm afraid our keys are not unique in our kafka archives. Same key might be present in multiple partitions, and even in multiple slices within partitions. I wonder if the future, RLI could support multiple parquet files (by storing an array of struct for eg). This would enable to leverage LRI in more contexts Thx