Re: Record level index with not unique keys

2023-08-16 Thread Vinoth Chandar
Hi,

yes the indexing DAG can support this today and even if not, it can be
easily fixed. Main issue would be how we encode the mapping well.
for e.g if we want map from user_id to all events that belong to the user,
we need a different, scalable way of storing this mapping.

I can organize this work under the 1.0 stream, if you are interested in
driving.

Thanks
Vinoth


On Thu, Jul 13, 2023 at 1:09 PM nicolas paris 
wrote:

> Hello Prashant, thanks for your time.
>
>
> > With non unique keys how would tagging of records (for updates /
> deletes) work?
>
> Currently both GLOBAL_SIMPLE/BLOOM work out of the box in the mentioned
> context. See below pyspark script and results. As for the
> implementation, the tagLocationBacktoRecords returns a rdd of
> HoodieRecord with (key/part/location), and it can contain duplicate
> keys (then multiple records for same key).
>
> ```
> tableName = "test_global_bloom"
> basePath = f"/tmp/{tableName}"
>
> hudi_options = {
> "hoodie.table.name": tableName,
> "hoodie.datasource.write.recordkey.field": "event_id",
> "hoodie.datasource.write.partitionpath.field": "part",
> "hoodie.datasource.write.table.name": tableName,
> "hoodie.datasource.write.precombine.field": "ts",
> "hoodie.datasource.write.hive_style_partitioning": "true",
> "hoodie.datasource.hive_sync.enable": "false",
> "hoodie.metadata.enable": "true",
> "hoodie.index.type": "GLOBAL_BLOOM", # GLOBAL_SIMPLE works as well
> }
>
> # LET'S GEN DUPLS
> mode="overwrite"
> df =spark.sql("""select '1' as event_id, '2' as ts, '2' as part UNION
>  select '1' as event_id, '3' as ts, '3' as part UNION
>  select '1' as event_id, '2' as ts, '3' as part UNION
>  select '2' as event_id, '2' as ts, '3' as part""")
> df.write.format("hudi").options(**hudi_options).option("hoodie.datasour
> ce.write.operation", "BULK_INSERT").mode(mode).save(basePath)
> spark.read.format("hudi").load(basePath).select("event_id",
> "ts","part").show()
> # ++---++
> # |event_id| ts|part|
> # ++---++
> # |   1|  3|   3|
> # |   1|  2|   3|
> # |   2|  2|   3|
> # |   1|  2|   2|
> # ++---++
>
> # UPDATE
> mode="append"
> spark.sql("select '1' as event_id, '20' as ts, '4' as
> part").write.format("hudi").options(**hudi_options).option("hoodie.data
> source.write.operation", "UPSERT").mode(mode).save(basePath)
> spark.read.format("hudi").load(basePath).select("event_id",
> "ts","part").show()
> # ++---++
> # |event_id| ts|part|
> # ++---++
> # |   1| 20|   4|
> # |   1| 20|   4|
> # |   1| 20|   4|
> # |   2|  2|   3|
> # ++---++
>
> # DELETE
> mode="append"
> spark.sql("select 1 as
> event_id").write.format("hudi").options(**hudi_options).option("hoodie.
> datasource.write.operation", "DELETE").mode(mode).save(basePath)
> spark.read.format("hudi").load(basePath).select("event_id",
> "ts","part").show()
> # ++---++
> # |event_id| ts|part|
> # ++---++
> # |   2|  2|   3|
> # ++---++
> ```
>
>
> > How would record Index know which mapping of the array to
> return for a given record key?
>
> As well as GLOBAL_SIMPLE/BLOOM, for a given record key, the RLI would
> return a list of mapping. Then the operation (update, delete, FCOW ...)
> would apply to each location.
>
> To illustrate, we could get something like this in the MDT:
>
> |event_id:1|[
>  {part=2, -5811947225812876253, -6812062179961430298, 0,
> 1689147210233},
>  {part=3, -711947225812876253, -8812062179961430298, 1,
> 1689147210233},
>  {part=3, -1811947225812876253, -2812062179961430298, 0,
> 1689147210233}
>  ]|
>
>
> On Thu, 2023-07-13 at 10:17 -0700, Prashant Wason wrote:
> > Hi Nicolas,
> >
> > The RI feature is designed for max performance as it is at a record-
> > count
> > scale. Hence, the schema is simplified and minimized.
> >
> > With non unique keys how would tagging of records (for updates /
> > deletes)
> > work? How would record Index know which mapping of the array to
> > return for
> > a given record key?
> >
> > Thanks
> > Prashant
> >
> >
> >
> > On Wed, Jul 12, 2023 at 2:02 AM nicolas paris
> > 
> > wrote:
> >
> > > hi there,
> > >
> > > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast
> > > COW
> > > (rfc-68) will be based on RLI to get the parquet offsets and allow
> > > targeting parquet row groups.
> > >
> > > RLI is a global index, therefore it assumes the hudi key is present
> > > in
> > > at most one parquet file. As a result in the MDT, the RLI is of
> > > type
> > > struct, and there is a 1:1 mapping w/ a given file.
> > >
> > > Type:
> > >|-- recordIndexMetadata: struct (nullable = true)
> > >||-- partition: string (nullable = false)
> > >||-- fileIdHighBits: long (nullable = false)
> > >||-- fileIdLowBits: long (nullable = false)
> > >|

Re: Record level index with not unique keys

2023-07-13 Thread nicolas paris
Hello Prashant, thanks for your time.


> With non unique keys how would tagging of records (for updates /
deletes) work?

Currently both GLOBAL_SIMPLE/BLOOM work out of the box in the mentioned
context. See below pyspark script and results. As for the
implementation, the tagLocationBacktoRecords returns a rdd of
HoodieRecord with (key/part/location), and it can contain duplicate
keys (then multiple records for same key).

```
tableName = "test_global_bloom"
basePath = f"/tmp/{tableName}"

hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "event_id",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "false",
"hoodie.metadata.enable": "true",
"hoodie.index.type": "GLOBAL_BLOOM", # GLOBAL_SIMPLE works as well
}

# LET'S GEN DUPLS
mode="overwrite"
df =spark.sql("""select '1' as event_id, '2' as ts, '2' as part UNION
 select '1' as event_id, '3' as ts, '3' as part UNION
 select '1' as event_id, '2' as ts, '3' as part UNION
 select '2' as event_id, '2' as ts, '3' as part""")
df.write.format("hudi").options(**hudi_options).option("hoodie.datasour
ce.write.operation", "BULK_INSERT").mode(mode).save(basePath)
spark.read.format("hudi").load(basePath).select("event_id",
"ts","part").show()
# ++---++
# |event_id| ts|part|
# ++---++
# |   1|  3|   3|
# |   1|  2|   3|
# |   2|  2|   3|
# |   1|  2|   2|
# ++---++

# UPDATE
mode="append"
spark.sql("select '1' as event_id, '20' as ts, '4' as
part").write.format("hudi").options(**hudi_options).option("hoodie.data
source.write.operation", "UPSERT").mode(mode).save(basePath)
spark.read.format("hudi").load(basePath).select("event_id",
"ts","part").show()
# ++---++
# |event_id| ts|part|
# ++---++
# |   1| 20|   4|
# |   1| 20|   4|
# |   1| 20|   4|
# |   2|  2|   3|
# ++---++

# DELETE
mode="append"
spark.sql("select 1 as
event_id").write.format("hudi").options(**hudi_options).option("hoodie.
datasource.write.operation", "DELETE").mode(mode).save(basePath)
spark.read.format("hudi").load(basePath).select("event_id",
"ts","part").show()
# ++---++
# |event_id| ts|part|
# ++---++
# |   2|  2|   3|
# ++---++
```


> How would record Index know which mapping of the array to
return for a given record key?

As well as GLOBAL_SIMPLE/BLOOM, for a given record key, the RLI would
return a list of mapping. Then the operation (update, delete, FCOW ...)
would apply to each location.

To illustrate, we could get something like this in the MDT:

|event_id:1|[
 {part=2, -5811947225812876253, -6812062179961430298, 0, 
1689147210233}, 
 {part=3, -711947225812876253, -8812062179961430298, 1, 
1689147210233},
 {part=3, -1811947225812876253, -2812062179961430298, 0, 
1689147210233} 
 ]|


On Thu, 2023-07-13 at 10:17 -0700, Prashant Wason wrote:
> Hi Nicolas,
> 
> The RI feature is designed for max performance as it is at a record-
> count
> scale. Hence, the schema is simplified and minimized.
> 
> With non unique keys how would tagging of records (for updates /
> deletes)
> work? How would record Index know which mapping of the array to
> return for
> a given record key?
> 
> Thanks
> Prashant
> 
> 
> 
> On Wed, Jul 12, 2023 at 2:02 AM nicolas paris
> 
> wrote:
> 
> > hi there,
> > 
> > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast
> > COW
> > (rfc-68) will be based on RLI to get the parquet offsets and allow
> > targeting parquet row groups.
> > 
> > RLI is a global index, therefore it assumes the hudi key is present
> > in
> > at most one parquet file. As a result in the MDT, the RLI is of
> > type
> > struct, and there is a 1:1 mapping w/ a given file.
> > 
> > Type:
> >    |-- recordIndexMetadata: struct (nullable = true)
> >    |    |-- partition: string (nullable = false)
> >    |    |-- fileIdHighBits: long (nullable = false)
> >    |    |-- fileIdLowBits: long (nullable = false)
> >    |    |-- fileIndex: integer (nullable = false)
> >    |    |-- instantTime: long (nullable = false)
> > 
> > Content:
> >    |event_id:1    |{part=3, -6811947225812876253,
> > -7812062179961430298, 0, 1689147210233}|
> > 
> > We would love to use both RLI and FCOW features, but I'm afraid our
> > keys are not unique in our kafka archives. Same key might be
> > present
> > in multiple partitions, and even in multiple slices within
> > partitions.
> > 
> > I wonder if the future, RLI could support multiple parquet files
> > (by
> > storing an array of struct for eg). This would enable to leverage
> > LRI
> > in more contexts
> > 
> > Thx
> > 
> > 
> > 
> > 
> > 



Re: Record level index with not unique keys

2023-07-13 Thread Prashant Wason
Hi Nicolas,

The RI feature is designed for max performance as it is at a record-count
scale. Hence, the schema is simplified and minimized.

With non unique keys how would tagging of records (for updates / deletes)
work? How would record Index know which mapping of the array to return for
a given record key?

Thanks
Prashant



On Wed, Jul 12, 2023 at 2:02 AM nicolas paris 
wrote:

> hi there,
>
> Just tested preview of RLI (rfc-08), amazing feature. Soon the fast COW
> (rfc-68) will be based on RLI to get the parquet offsets and allow
> targeting parquet row groups.
>
> RLI is a global index, therefore it assumes the hudi key is present in
> at most one parquet file. As a result in the MDT, the RLI is of type
> struct, and there is a 1:1 mapping w/ a given file.
>
> Type:
>|-- recordIndexMetadata: struct (nullable = true)
>||-- partition: string (nullable = false)
>||-- fileIdHighBits: long (nullable = false)
>||-- fileIdLowBits: long (nullable = false)
>||-- fileIndex: integer (nullable = false)
>||-- instantTime: long (nullable = false)
>
> Content:
>|event_id:1|{part=3, -6811947225812876253,
> -7812062179961430298, 0, 1689147210233}|
>
> We would love to use both RLI and FCOW features, but I'm afraid our
> keys are not unique in our kafka archives. Same key might be present
> in multiple partitions, and even in multiple slices within partitions.
>
> I wonder if the future, RLI could support multiple parquet files (by
> storing an array of struct for eg). This would enable to leverage LRI
> in more contexts
>
> Thx
>
>
>
>
>


Record level index with not unique keys

2023-07-12 Thread nicolas paris
hi there,

Just tested preview of RLI (rfc-08), amazing feature. Soon the fast COW
(rfc-68) will be based on RLI to get the parquet offsets and allow
targeting parquet row groups.

RLI is a global index, therefore it assumes the hudi key is present in
at most one parquet file. As a result in the MDT, the RLI is of type
struct, and there is a 1:1 mapping w/ a given file.

Type:
   |-- recordIndexMetadata: struct (nullable = true)  
   ||-- partition: string (nullable = false)  
   ||-- fileIdHighBits: long (nullable = false)  
   ||-- fileIdLowBits: long (nullable = false)  
   ||-- fileIndex: integer (nullable = false)  
   ||-- instantTime: long (nullable = false)

Content:
   |event_id:1|{part=3, -6811947225812876253, -7812062179961430298, 0, 
1689147210233}|
   
We would love to use both RLI and FCOW features, but I'm afraid our 
keys are not unique in our kafka archives. Same key might be present 
in multiple partitions, and even in multiple slices within partitions.

I wonder if the future, RLI could support multiple parquet files (by 
storing an array of struct for eg). This would enable to leverage LRI
in more contexts

Thx