Re: What precombine field really is used for and its future?

2023-04-05 Thread Daniel Kaźmirski
Hi Vinoth,

Thanks for your reply!

Regarding the first part, I agree that precombine solves a lot of issues,
especially during the ingestion.
I think this is a valid behavior and should be preserved so that we can
enjoy out-of-order events and duplicates handled by the framework.
I'm also aware there are many use-cases people want to solve with Hudi and
it ranges from  "i don't need precombine field" to "i need many precombine
fields" or "my deduplication logic is complex and I just need to do it
ourselves before write and outside Hudi".

Maybe I should share my usecase to show where this topic comes from and
what's my point of view.
I use precombine myself for handling events delivered from various
applications that are delivered using kafka. These are stored in Hudi table.
Then once these are stored, I have a nice deduplicated dataset that I can
use to create derived tables (App -> Kafka -> Hudi table "raw" -> n derived
Hudi tables). Then in the second part (hudi raw -> hudi derived) I can
safely assume there will be no duplicates and out-of-order events coming
from the first Hudi table, therefore I don't really need precombine field
and it makes modeling this derived table easier as I don't need to consider
how precombine will behave. At the same time, it does not make sense to
introduce another tool and grow my stack for it if Hudi can handle it.

Sometimes I just don't have any field in the source system/table that can
be used as precombine field and am forced to provide Hudi it with something
like "ingestion/processing timestamp" or some constant.


Regarding the proposal comments, I agree overall and understand that these
are not easy choices to make.
>From a user perspective, it would be great to have coherent APIs with
consistent behavior and no surprises. I agree with Ken, there
are operations where it does not feel like precombine should be needed (or
should be something internal and abstracted away from the user).
You're right Vinoth and I was wrong, the update does not deduplicate
existing records, as I checked it will rather get the latest record based
on percombine field, update values, and replace duplicates with it.
Regarding the RDBMS lense, some systems introduce PK NOT ENFORCED mode to
allow duplicates on insert, but always deduplicate on update. If users want
to preserve duplicates but also want to update values, they need to delete
and insert them again on their own. This is a clean way to do it imo, but
is opinionated and some may dislike it, at the same PK uniqueness and
deduplication is at Hudi core principles and is a differentiator.
This article from MS Synapse has nice examples around this:
https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-table-constraints
I myself have cases where I care about not missing any data and decide to
store duplicates and handle these later. I think right now Hudi gives us
nice control here with insert modes. If one thinks Hudi should fail on
write, then strict insert mode does the trick but is not a default.


I gathered some JIRA tickets related to precombine/no precombine model:
https://issues.apache.org/jira/browse/HUDI-2633
https://issues.apache.org/jira/browse/HUDI-4701
https://issues.apache.org/jira/browse/HUDI-5848
https://issues.apache.org/jira/browse/HUDI-2681

But maybe you'd like to have a new ticket for this work?
I'm always happy to help.

BR,
Daniel

śr., 5 kwi 2023 o 18:59 Ken Krugler 
napisał(a):

> Hi Vinoth,
>
> I just want to make sure my issue was clear - it seems like Spark
> shouldn’t be requiring a precombined field (or checking that it exists)
> when dropping partitions.
>
> Thanks,
>
> — Ken
>
>
> > On Apr 4, 2023, at 7:31 AM, Vinoth Chandar  wrote:
> >
> > Thanks for raising this issue.
> >
> > Love to use this opp to share more context on why the preCombine field
> > exists.
> >
> >   - As you probably inferred already, we needed to eliminate duplicates,
> >   while dealing with out-of-order data (e.g database change records
> arriving
> >   in different orders from two Kafka clusters in two zones). So it was
> >   necessary to preCombine by a "event" field, rather than just the
> arrival
> >   time (which is what _hoodie_commit_time is).
> >   - This comes from stream processing concepts like
> >   https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
> >   which build upon inadequacies in traditional database systems to deal
> with
> >   things like this. At the end of the day, we are solving a "processing"
> >   problem IMO with Hudi - Hudi replaces existing batch/streaming
> pipelines,
> >   not OLTP databases. That's at-least the lens we approached it from.
> >   - For this to work end-end, it is not sufficient to just precombine
> >   within a batch of incoming writes, we also need to consistently apply
> the
> >   same against data in storage. In CoW, we implicitly merge against
> storage,
> >   so its simpler. But for MoR, we simply append records to 

Re: What precombine field really is used for and its future?

2023-04-05 Thread Ken Krugler
Hi Vinoth,

I just want to make sure my issue was clear - it seems like Spark shouldn’t be 
requiring a precombined field (or checking that it exists) when dropping 
partitions.

Thanks,

— Ken


> On Apr 4, 2023, at 7:31 AM, Vinoth Chandar  wrote:
> 
> Thanks for raising this issue.
> 
> Love to use this opp to share more context on why the preCombine field
> exists.
> 
>   - As you probably inferred already, we needed to eliminate duplicates,
>   while dealing with out-of-order data (e.g database change records arriving
>   in different orders from two Kafka clusters in two zones). So it was
>   necessary to preCombine by a "event" field, rather than just the arrival
>   time (which is what _hoodie_commit_time is).
>   - This comes from stream processing concepts like
>   https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
>   which build upon inadequacies in traditional database systems to deal with
>   things like this. At the end of the day, we are solving a "processing"
>   problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines,
>   not OLTP databases. That's at-least the lens we approached it from.
>   - For this to work end-end, it is not sufficient to just precombine
>   within a batch of incoming writes, we also need to consistently apply the
>   same against data in storage. In CoW, we implicitly merge against storage,
>   so its simpler. But for MoR, we simply append records to log files, so we
>   needed to make this a table property - such that queries/compaction can
>   later do the right preCombine. Hope that clarifies the CoW vs MoR
>   differences.
> 
> On the issues raised/proposals here.
> 
>   1. I think we need some dedicated efforts across the different writer
>   paths to make it easier. probably some lower hanging fruits here. Some of
>   it results from just different authors contributing to different code paths
>   in an OSS project.
>   2. On picking a sane default precombine field. _hoodie_commit_time is a
>   good candidate for preCombine field, as you point out, we would just pick
>   1/many records with the same key arbitrarily, in that scenario. On
>   storage/across commits, we would pick the value with the latest
>   commit_time/last writer wins - which would make queries repeatedly provide
>   the same consistent values as well.  Needs more thought.
>   3. If the user desires to customize this behavior, they could supply a
>   preCombine field that is different. This would be similar to semantics of
>   event time vs arrival order processing in streaming systems. Personally, I
>   need to spend a bit more time digging to come up with an elegant solution
>   here.
>   4. For the proposals on how Hudi could de-duplicate, after the fact that
>   inserts introduced duplicates - I think the current behavior is a bit more
>   condoning than what I'd like tbh. It updates both the records IIRC. I think
>   Hudi should ensure record key uniqueness across different paths and fail
>   the write if it's violated. - if we think of this as in RDBMS lens, that's
>   what would happen, correct?
> 
> 
> Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
> issues around this, we could discuss out short, long term plans?
> 
> Thanks
> Vinoth
> 
> On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler 
> wrote:
> 
>> Hi Daniel,
>> 
>> Thanks for the detailed write-up.
>> 
>> I can’t add much to the discussion, other than noting we also recently ran
>> into the related oddity that we don’t need to define a precombine when
>> writing data to a COW table (using Flink), but then trying to use Spark to
>> drop partitions failed because there’s a default precombine field name (set
>> to “ts”), and if that field doesn’t exist then the Spark job fails.
>> 
>> — Ken
>> 
>> 
>>> On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski 
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I would like to bring up the topic of how precombine field is used and
>>> what's the purpose of it. I would also like to know what are the plans
>> for
>>> it in the future.
>>> 
>>> At first glance precombine filed looks like it's only used to deduplicate
>>> records in incoming batch.
>>> But when digging deeper it looks like it can/is also be used to:
>>> 1. combine records not before but on write to decide if update existing
>>> record (eg with DefaultHoodieRecordPayload)
>>> 2. combine records on read for MoR table to combine log and base files
>>> correctly.
>>> 3. precombine field is required for spark SQL UPDATE, even if user can't
>>> provide duplicates anyways with this sql statement.
>>> 
>>> Regarding [3] there's inconsistency as precombine field is not required
>> in
>>> MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode
>>> to update existing records.
>>> 
>>> I know that Hudi does a lot of work to ensure PK uniqueness across/within
>>> partitions and there is a need to deduplicate records before write or to
>>> deduplicate existing data if duplicates were 

Re: What precombine field really is used for and its future?

2023-04-04 Thread Vinoth Chandar
This current thread is another example of a practical need for pre
combine field.
 "[DISCUSS] split source of kafka partition by count"


On Tue, Apr 4, 2023 at 7:31 AM Vinoth Chandar  wrote:

> Thanks for raising this issue.
>
> Love to use this opp to share more context on why the preCombine field
> exists.
>
>- As you probably inferred already, we needed to eliminate duplicates,
>while dealing with out-of-order data (e.g database change records arriving
>in different orders from two Kafka clusters in two zones). So it was
>necessary to preCombine by a "event" field, rather than just the arrival
>time (which is what _hoodie_commit_time is).
>- This comes from stream processing concepts like
>https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
>which build upon inadequacies in traditional database systems to deal with
>things like this. At the end of the day, we are solving a "processing"
>problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines,
>not OLTP databases. That's at-least the lens we approached it from.
>- For this to work end-end, it is not sufficient to just precombine
>within a batch of incoming writes, we also need to consistently apply the
>same against data in storage. In CoW, we implicitly merge against storage,
>so its simpler. But for MoR, we simply append records to log files, so we
>needed to make this a table property - such that queries/compaction can
>later do the right preCombine. Hope that clarifies the CoW vs MoR
>differences.
>
> On the issues raised/proposals here.
>
>1. I think we need some dedicated efforts across the different writer
>paths to make it easier. probably some lower hanging fruits here. Some of
>it results from just different authors contributing to different code paths
>in an OSS project.
>2. On picking a sane default precombine field. _hoodie_commit_time is
>a good candidate for preCombine field, as you point out, we would just pick
>1/many records with the same key arbitrarily, in that scenario. On
>storage/across commits, we would pick the value with the latest
>commit_time/last writer wins - which would make queries repeatedly provide
>the same consistent values as well.  Needs more thought.
>3. If the user desires to customize this behavior, they could supply a
>preCombine field that is different. This would be similar to semantics of
>event time vs arrival order processing in streaming systems. Personally, I
>need to spend a bit more time digging to come up with an elegant solution
>here.
>4. For the proposals on how Hudi could de-duplicate, after the fact
>that inserts introduced duplicates - I think the current behavior is a bit
>more condoning than what I'd like tbh. It updates both the records IIRC. I
>think Hudi should ensure record key uniqueness across different paths and
>fail the write if it's violated. - if we think of this as in RDBMS lens,
>that's what would happen, correct?
>
>
> Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
> issues around this, we could discuss out short, long term plans?
>
> Thanks
> Vinoth
>
> On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler 
> wrote:
>
>> Hi Daniel,
>>
>> Thanks for the detailed write-up.
>>
>> I can’t add much to the discussion, other than noting we also recently
>> ran into the related oddity that we don’t need to define a precombine when
>> writing data to a COW table (using Flink), but then trying to use Spark to
>> drop partitions failed because there’s a default precombine field name (set
>> to “ts”), and if that field doesn’t exist then the Spark job fails.
>>
>> — Ken
>>
>>
>> > On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski 
>> wrote:
>> >
>> > Hi all,
>> >
>> > I would like to bring up the topic of how precombine field is used and
>> > what's the purpose of it. I would also like to know what are the plans
>> for
>> > it in the future.
>> >
>> > At first glance precombine filed looks like it's only used to
>> deduplicate
>> > records in incoming batch.
>> > But when digging deeper it looks like it can/is also be used to:
>> > 1. combine records not before but on write to decide if update existing
>> > record (eg with DefaultHoodieRecordPayload)
>> > 2. combine records on read for MoR table to combine log and base files
>> > correctly.
>> > 3. precombine field is required for spark SQL UPDATE, even if user can't
>> > provide duplicates anyways with this sql statement.
>> >
>> > Regarding [3] there's inconsistency as precombine field is not required
>> in
>> > MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert
>> mode
>> > to update existing records.
>> >
>> > I know that Hudi does a lot of work to ensure PK uniqueness
>> across/within
>> > partitions and there is a need to deduplicate records before write or to
>> > deduplicate existing data if duplicates were introduced 

Re: What precombine field really is used for and its future?

2023-04-04 Thread Vinoth Chandar
Thanks for raising this issue.

Love to use this opp to share more context on why the preCombine field
exists.

   - As you probably inferred already, we needed to eliminate duplicates,
   while dealing with out-of-order data (e.g database change records arriving
   in different orders from two Kafka clusters in two zones). So it was
   necessary to preCombine by a "event" field, rather than just the arrival
   time (which is what _hoodie_commit_time is).
   - This comes from stream processing concepts like
   https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
   which build upon inadequacies in traditional database systems to deal with
   things like this. At the end of the day, we are solving a "processing"
   problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines,
   not OLTP databases. That's at-least the lens we approached it from.
   - For this to work end-end, it is not sufficient to just precombine
   within a batch of incoming writes, we also need to consistently apply the
   same against data in storage. In CoW, we implicitly merge against storage,
   so its simpler. But for MoR, we simply append records to log files, so we
   needed to make this a table property - such that queries/compaction can
   later do the right preCombine. Hope that clarifies the CoW vs MoR
   differences.

On the issues raised/proposals here.

   1. I think we need some dedicated efforts across the different writer
   paths to make it easier. probably some lower hanging fruits here. Some of
   it results from just different authors contributing to different code paths
   in an OSS project.
   2. On picking a sane default precombine field. _hoodie_commit_time is a
   good candidate for preCombine field, as you point out, we would just pick
   1/many records with the same key arbitrarily, in that scenario. On
   storage/across commits, we would pick the value with the latest
   commit_time/last writer wins - which would make queries repeatedly provide
   the same consistent values as well.  Needs more thought.
   3. If the user desires to customize this behavior, they could supply a
   preCombine field that is different. This would be similar to semantics of
   event time vs arrival order processing in streaming systems. Personally, I
   need to spend a bit more time digging to come up with an elegant solution
   here.
   4. For the proposals on how Hudi could de-duplicate, after the fact that
   inserts introduced duplicates - I think the current behavior is a bit more
   condoning than what I'd like tbh. It updates both the records IIRC. I think
   Hudi should ensure record key uniqueness across different paths and fail
   the write if it's violated. - if we think of this as in RDBMS lens, that's
   what would happen, correct?


Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
issues around this, we could discuss out short, long term plans?

Thanks
Vinoth

On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler 
wrote:

> Hi Daniel,
>
> Thanks for the detailed write-up.
>
> I can’t add much to the discussion, other than noting we also recently ran
> into the related oddity that we don’t need to define a precombine when
> writing data to a COW table (using Flink), but then trying to use Spark to
> drop partitions failed because there’s a default precombine field name (set
> to “ts”), and if that field doesn’t exist then the Spark job fails.
>
> — Ken
>
>
> > On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski 
> wrote:
> >
> > Hi all,
> >
> > I would like to bring up the topic of how precombine field is used and
> > what's the purpose of it. I would also like to know what are the plans
> for
> > it in the future.
> >
> > At first glance precombine filed looks like it's only used to deduplicate
> > records in incoming batch.
> > But when digging deeper it looks like it can/is also be used to:
> > 1. combine records not before but on write to decide if update existing
> > record (eg with DefaultHoodieRecordPayload)
> > 2. combine records on read for MoR table to combine log and base files
> > correctly.
> > 3. precombine field is required for spark SQL UPDATE, even if user can't
> > provide duplicates anyways with this sql statement.
> >
> > Regarding [3] there's inconsistency as precombine field is not required
> in
> > MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode
> > to update existing records.
> >
> > I know that Hudi does a lot of work to ensure PK uniqueness across/within
> > partitions and there is a need to deduplicate records before write or to
> > deduplicate existing data if duplicates were introduced eg when using
> > non-strict insert mode.
> >
> > What should then happen in a situation where user does not want or can
> not
> > provide a pre-combine field? Then it's on user not to introduce
> duplicates,
> > but makes Hudi more generic and easier to use for "SQL" people.
> >
> > No precombine is possible for CoW, already, but UPSERT and SQL 

Re: What precombine field really is used for and its future?

2023-04-01 Thread Ken Krugler
Hi Daniel,

Thanks for the detailed write-up.

I can’t add much to the discussion, other than noting we also recently ran into 
the related oddity that we don’t need to define a precombine when writing data 
to a COW table (using Flink), but then trying to use Spark to drop partitions 
failed because there’s a default precombine field name (set to “ts”), and if 
that field doesn’t exist then the Spark job fails.

— Ken


> On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski  wrote:
> 
> Hi all,
> 
> I would like to bring up the topic of how precombine field is used and
> what's the purpose of it. I would also like to know what are the plans for
> it in the future.
> 
> At first glance precombine filed looks like it's only used to deduplicate
> records in incoming batch.
> But when digging deeper it looks like it can/is also be used to:
> 1. combine records not before but on write to decide if update existing
> record (eg with DefaultHoodieRecordPayload)
> 2. combine records on read for MoR table to combine log and base files
> correctly.
> 3. precombine field is required for spark SQL UPDATE, even if user can't
> provide duplicates anyways with this sql statement.
> 
> Regarding [3] there's inconsistency as precombine field is not required in
> MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode
> to update existing records.
> 
> I know that Hudi does a lot of work to ensure PK uniqueness across/within
> partitions and there is a need to deduplicate records before write or to
> deduplicate existing data if duplicates were introduced eg when using
> non-strict insert mode.
> 
> What should then happen in a situation where user does not want or can not
> provide a pre-combine field? Then it's on user not to introduce duplicates,
> but makes Hudi more generic and easier to use for "SQL" people.
> 
> No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is
> not supported (but users can update records using Insert in non-strict mode
> or MERGE INTO UPDATE).
> There's also a difference between CoW and MoR where for MoR
> precombine field is a hard requirement, but is optional for CoW.
> (UPDATES with no precombine are also possible in Flink for both CoW and MoR
> but not in Spark.)
> 
> Would it make sense to take inspiration from some DBMS systems then (eg
> Synapse) to allow updates and upserts when no precombine field is specified?
> Scenario:
> Say that duplicates were introduced with Insert in non-strict mode, no
> precombine field is specified, then we have two options:
> option 1) on UPDATE/UPSERT Hudi should deduplicate the existing records, as
> there's no precombine field it's expected we don't know which records will
> be removed and which will be effectively updated and preserved in the
> table. (This can be also achieved by always providing the same value in
> precombine field for all records.)
> option 2) on UPDATE/UPSERT Hudi should deduplicate the existing records, as
> there's no precombine field, record with the latest _hoodie_commit_time is
> preserved and updated, other records with the same PK are removed.
> 
> In both cases, deduplication on UPDATE/UPSERT becomes a hard rule
> whether we use precombine field or not.
> 
> Then regarding MoR and merging records on read (found this in Hudi format
> spec), can it be done by only using _hoodie_commit_time in absence of
> precombine field?
> If so for both MoR and CoW precombine field can become completely optional?
> 
> I'm of course looking at it more from the user perspective, it would be
> nice to know what is and what is not possible from the design and developer
> perspective.
> 
> Best Regards,
> Daniel Kaźmirski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





What precombine field really is used for and its future?

2023-03-31 Thread Daniel Kaźmirski
Hi all,

I would like to bring up the topic of how precombine field is used and
what's the purpose of it. I would also like to know what are the plans for
it in the future.

At first glance precombine filed looks like it's only used to deduplicate
records in incoming batch.
But when digging deeper it looks like it can/is also be used to:
1. combine records not before but on write to decide if update existing
record (eg with DefaultHoodieRecordPayload)
2. combine records on read for MoR table to combine log and base files
correctly.
3. precombine field is required for spark SQL UPDATE, even if user can't
provide duplicates anyways with this sql statement.

Regarding [3] there's inconsistency as precombine field is not required in
MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode
to update existing records.

I know that Hudi does a lot of work to ensure PK uniqueness across/within
partitions and there is a need to deduplicate records before write or to
deduplicate existing data if duplicates were introduced eg when using
non-strict insert mode.

What should then happen in a situation where user does not want or can not
provide a pre-combine field? Then it's on user not to introduce duplicates,
but makes Hudi more generic and easier to use for "SQL" people.

No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is
not supported (but users can update records using Insert in non-strict mode
or MERGE INTO UPDATE).
There's also a difference between CoW and MoR where for MoR
precombine field is a hard requirement, but is optional for CoW.
(UPDATES with no precombine are also possible in Flink for both CoW and MoR
but not in Spark.)

Would it make sense to take inspiration from some DBMS systems then (eg
Synapse) to allow updates and upserts when no precombine field is specified?
Scenario:
Say that duplicates were introduced with Insert in non-strict mode, no
precombine field is specified, then we have two options:
option 1) on UPDATE/UPSERT Hudi should deduplicate the existing records, as
there's no precombine field it's expected we don't know which records will
be removed and which will be effectively updated and preserved in the
table. (This can be also achieved by always providing the same value in
precombine field for all records.)
option 2) on UPDATE/UPSERT Hudi should deduplicate the existing records, as
there's no precombine field, record with the latest _hoodie_commit_time is
preserved and updated, other records with the same PK are removed.

In both cases, deduplication on UPDATE/UPSERT becomes a hard rule
whether we use precombine field or not.

Then regarding MoR and merging records on read (found this in Hudi format
spec), can it be done by only using _hoodie_commit_time in absence of
precombine field?
If so for both MoR and CoW precombine field can become completely optional?

I'm of course looking at it more from the user perspective, it would be
nice to know what is and what is not possible from the design and developer
perspective.

Best Regards,
Daniel Kaźmirski