Re: Re: Re: [DISCUSS] split source of kafka partition by count

2023-04-07 Thread Vinoth Chandar
Pulled in another reviewer as well. Left a comment. We can move the
discussion to the PR?

Thanks for the useful contribution!

On Thu, Apr 6, 2023 at 12:34 AM 孔维 <18701146...@163.com> wrote:

> Hi, vinoth,
>
> I created a PR(https://github.com/apache/hudi/pull/8376) for this
> feature, could you help review it?
>
>
> BR,
> Kong
>
>
>
>
> At 2023-04-05 00:19:20, "Vinoth Chandar"  wrote:
> >Look forward to this! could really help backfill/rebootstrap scenarios.
> >
> >On Tue, Apr 4, 2023 at 9:18 AM Vinoth Chandar  wrote:
> >
> >> Thinking out loud.
> >>
> >> 1. For insert operations, it should not matter anyway.
> >> 2. For upsert etc, the preCombine would handle the ordering problems.
> >>
> >> Is that what you are saying? I feel we don't want to leak any Kafka
> >> specific logic or force use of special payloads etc. thoughts?
> >>
> >> I assigned the jira to you and also made you a contributor. So in future,
> >> you can self-assign.
> >>
> >> On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote:
> >>
> >>> Hi,
> >>>
> >>>
> >>> Yea, we can create multiple spark input partitions per Kafka partition.
> >>>
> >>>
> >>> I think the write operations can handle the potentially out-of-order
> >>> events, because before writing we need to preCombine the incoming events
> >>> using source-ordering-field and we also need to combineAndGetUpdateValue
> >>> with records on storage. From a business perspective, we use the combine
> >>> logic to keep our data correct. And hudi does not require any guarantees
> >>> about the ordering of kafka events.
> >>>
> >>>
> >>> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019],
> >>> could you help assign the JIRA to me?
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> At 2023-04-03 23:27:13, "Vinoth Chandar"  wrote:
> >>> >Hi,
> >>> >
> >>> >Does your implementation read out offset ranges from Kafka partitions?
> >>> >which means - we can create multiple spark input partitions per Kafka
> >>> >partitions?
> >>> >if so, +1 for overall goals here.
> >>> >
> >>> >How does this affect ordering? Can you think about how/if Hudi write
> >>> >operations can handle potentially out-of-order events being read out?
> >>> >It feels like we can add a JIRA for this anyway.
> >>> >
> >>> >
> >>> >
> >>> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:
> >>> >
> >>> >> Hi team, for the kafka source, when pulling data from kafka, the
> >>> default
> >>> >> parallelism is the number of kafka partitions.
> >>> >> There are cases:
> >>> >>
> >>> >> Pulling large amount of data from kafka (eg. maxEvents=1), but
> >>> the
> >>> >> # of kafka partition is not enough, the procedure of the pulling will
> >>> cost
> >>> >> too much of time, even worse cause the executor OOM
> >>> >> There is huge data skew between kafka partitions, the procedure of the
> >>> >> pulling will be blocked by the slowest partition
> >>> >>
> >>> >> to solve those cases, I want to add a parameter
> >>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the
> >>> maxEvents in
> >>> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
> >>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
> >>> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents
> >>> config.
> >>> >>
> >>> >>
> >>> >> Here is my POC of the imporvement:
> >>> >> max executor core is 128.
> >>> >> not turn the feature on
> >>> >> (hoodie.deltastreamer.kafka.source.maxEvents=5000)
> >>> >>
> >>> >>
> >>> >> turn on the feature
> >>> (hoodie.deltastreamer.kafka.per.batch.maxEvents=20)
> >>> >>
> >>> >>
> >>> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins
> >>> to
> >>> >> 1.1 mins, can be more faster if given more cores.
> >>> >>
> >>> >> How do you think? can I file a jira issue for this?
> >>>
> >>
>
>


Re:Re: Re: [DISCUSS] split source of kafka partition by count

2023-04-06 Thread 孔维
Hi, vinoth,


I created a PR(https://github.com/apache/hudi/pull/8376) for this feature, 
could you help review it?




BR,
Kong








At 2023-04-05 00:19:20, "Vinoth Chandar"  wrote:
>Look forward to this! could really help backfill/rebootstrap scenarios.
>
>On Tue, Apr 4, 2023 at 9:18 AM Vinoth Chandar  wrote:
>
>> Thinking out loud.
>>
>> 1. For insert operations, it should not matter anyway.
>> 2. For upsert etc, the preCombine would handle the ordering problems.
>>
>> Is that what you are saying? I feel we don't want to leak any Kafka
>> specific logic or force use of special payloads etc. thoughts?
>>
>> I assigned the jira to you and also made you a contributor. So in future,
>> you can self-assign.
>>
>> On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> Yea, we can create multiple spark input partitions per Kafka partition.
>>>
>>>
>>> I think the write operations can handle the potentially out-of-order
>>> events, because before writing we need to preCombine the incoming events
>>> using source-ordering-field and we also need to combineAndGetUpdateValue
>>> with records on storage. From a business perspective, we use the combine
>>> logic to keep our data correct. And hudi does not require any guarantees
>>> about the ordering of kafka events.
>>>
>>>
>>> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019],
>>> could you help assign the JIRA to me?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> At 2023-04-03 23:27:13, "Vinoth Chandar"  wrote:
>>> >Hi,
>>> >
>>> >Does your implementation read out offset ranges from Kafka partitions?
>>> >which means - we can create multiple spark input partitions per Kafka
>>> >partitions?
>>> >if so, +1 for overall goals here.
>>> >
>>> >How does this affect ordering? Can you think about how/if Hudi write
>>> >operations can handle potentially out-of-order events being read out?
>>> >It feels like we can add a JIRA for this anyway.
>>> >
>>> >
>>> >
>>> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:
>>> >
>>> >> Hi team, for the kafka source, when pulling data from kafka, the
>>> default
>>> >> parallelism is the number of kafka partitions.
>>> >> There are cases:
>>> >>
>>> >> Pulling large amount of data from kafka (eg. maxEvents=1), but
>>> the
>>> >> # of kafka partition is not enough, the procedure of the pulling will
>>> cost
>>> >> too much of time, even worse cause the executor OOM
>>> >> There is huge data skew between kafka partitions, the procedure of the
>>> >> pulling will be blocked by the slowest partition
>>> >>
>>> >> to solve those cases, I want to add a parameter
>>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the
>>> maxEvents in
>>> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
>>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
>>> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents
>>> config.
>>> >>
>>> >>
>>> >> Here is my POC of the imporvement:
>>> >> max executor core is 128.
>>> >> not turn the feature on
>>> >> (hoodie.deltastreamer.kafka.source.maxEvents=5000)
>>> >>
>>> >>
>>> >> turn on the feature
>>> (hoodie.deltastreamer.kafka.per.batch.maxEvents=20)
>>> >>
>>> >>
>>> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins
>>> to
>>> >> 1.1 mins, can be more faster if given more cores.
>>> >>
>>> >> How do you think? can I file a jira issue for this?
>>>
>>


Re: Re: [DISCUSS] split source of kafka partition by count

2023-04-04 Thread Vinoth Chandar
Look forward to this! could really help backfill/rebootstrap scenarios.

On Tue, Apr 4, 2023 at 9:18 AM Vinoth Chandar  wrote:

> Thinking out loud.
>
> 1. For insert operations, it should not matter anyway.
> 2. For upsert etc, the preCombine would handle the ordering problems.
>
> Is that what you are saying? I feel we don't want to leak any Kafka
> specific logic or force use of special payloads etc. thoughts?
>
> I assigned the jira to you and also made you a contributor. So in future,
> you can self-assign.
>
> On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote:
>
>> Hi,
>>
>>
>> Yea, we can create multiple spark input partitions per Kafka partition.
>>
>>
>> I think the write operations can handle the potentially out-of-order
>> events, because before writing we need to preCombine the incoming events
>> using source-ordering-field and we also need to combineAndGetUpdateValue
>> with records on storage. From a business perspective, we use the combine
>> logic to keep our data correct. And hudi does not require any guarantees
>> about the ordering of kafka events.
>>
>>
>> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019],
>> could you help assign the JIRA to me?
>>
>>
>>
>>
>>
>>
>>
>> At 2023-04-03 23:27:13, "Vinoth Chandar"  wrote:
>> >Hi,
>> >
>> >Does your implementation read out offset ranges from Kafka partitions?
>> >which means - we can create multiple spark input partitions per Kafka
>> >partitions?
>> >if so, +1 for overall goals here.
>> >
>> >How does this affect ordering? Can you think about how/if Hudi write
>> >operations can handle potentially out-of-order events being read out?
>> >It feels like we can add a JIRA for this anyway.
>> >
>> >
>> >
>> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:
>> >
>> >> Hi team, for the kafka source, when pulling data from kafka, the
>> default
>> >> parallelism is the number of kafka partitions.
>> >> There are cases:
>> >>
>> >> Pulling large amount of data from kafka (eg. maxEvents=1), but
>> the
>> >> # of kafka partition is not enough, the procedure of the pulling will
>> cost
>> >> too much of time, even worse cause the executor OOM
>> >> There is huge data skew between kafka partitions, the procedure of the
>> >> pulling will be blocked by the slowest partition
>> >>
>> >> to solve those cases, I want to add a parameter
>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the
>> maxEvents in
>> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
>> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents
>> config.
>> >>
>> >>
>> >> Here is my POC of the imporvement:
>> >> max executor core is 128.
>> >> not turn the feature on
>> >> (hoodie.deltastreamer.kafka.source.maxEvents=5000)
>> >>
>> >>
>> >> turn on the feature
>> (hoodie.deltastreamer.kafka.per.batch.maxEvents=20)
>> >>
>> >>
>> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins
>> to
>> >> 1.1 mins, can be more faster if given more cores.
>> >>
>> >> How do you think? can I file a jira issue for this?
>>
>


Re: Re: [DISCUSS] split source of kafka partition by count

2023-04-04 Thread Vinoth Chandar
Thinking out loud.

1. For insert operations, it should not matter anyway.
2. For upsert etc, the preCombine would handle the ordering problems.

Is that what you are saying? I feel we don't want to leak any Kafka
specific logic or force use of special payloads etc. thoughts?

I assigned the jira to you and also made you a contributor. So in future,
you can self-assign.

On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote:

> Hi,
>
>
> Yea, we can create multiple spark input partitions per Kafka partition.
>
>
> I think the write operations can handle the potentially out-of-order
> events, because before writing we need to preCombine the incoming events
> using source-ordering-field and we also need to combineAndGetUpdateValue
> with records on storage. From a business perspective, we use the combine
> logic to keep our data correct. And hudi does not require any guarantees
> about the ordering of kafka events.
>
>
> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019],
> could you help assign the JIRA to me?
>
>
>
>
>
>
>
> At 2023-04-03 23:27:13, "Vinoth Chandar"  wrote:
> >Hi,
> >
> >Does your implementation read out offset ranges from Kafka partitions?
> >which means - we can create multiple spark input partitions per Kafka
> >partitions?
> >if so, +1 for overall goals here.
> >
> >How does this affect ordering? Can you think about how/if Hudi write
> >operations can handle potentially out-of-order events being read out?
> >It feels like we can add a JIRA for this anyway.
> >
> >
> >
> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:
> >
> >> Hi team, for the kafka source, when pulling data from kafka, the default
> >> parallelism is the number of kafka partitions.
> >> There are cases:
> >>
> >> Pulling large amount of data from kafka (eg. maxEvents=1), but
> the
> >> # of kafka partition is not enough, the procedure of the pulling will
> cost
> >> too much of time, even worse cause the executor OOM
> >> There is huge data skew between kafka partitions, the procedure of the
> >> pulling will be blocked by the slowest partition
> >>
> >> to solve those cases, I want to add a parameter
> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents
> in
> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
> >> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents
> config.
> >>
> >>
> >> Here is my POC of the imporvement:
> >> max executor core is 128.
> >> not turn the feature on
> >> (hoodie.deltastreamer.kafka.source.maxEvents=5000)
> >>
> >>
> >> turn on the feature
> (hoodie.deltastreamer.kafka.per.batch.maxEvents=20)
> >>
> >>
> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins to
> >> 1.1 mins, can be more faster if given more cores.
> >>
> >> How do you think? can I file a jira issue for this?
>


Re: [DISCUSS] split source of kafka partition by count

2023-04-03 Thread Vinoth Chandar
Hi,

Does your implementation read out offset ranges from Kafka partitions?
which means - we can create multiple spark input partitions per Kafka
partitions?
if so, +1 for overall goals here.

How does this affect ordering? Can you think about how/if Hudi write
operations can handle potentially out-of-order events being read out?
It feels like we can add a JIRA for this anyway.



On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:

> Hi team, for the kafka source, when pulling data from kafka, the default
> parallelism is the number of kafka partitions.
> There are cases:
>
> Pulling large amount of data from kafka (eg. maxEvents=1), but the
> # of kafka partition is not enough, the procedure of the pulling will cost
> too much of time, even worse cause the executor OOM
> There is huge data skew between kafka partitions, the procedure of the
> pulling will be blocked by the slowest partition
>
> to solve those cases, I want to add a parameter
> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents in
> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
> take effect after the hoodie.deltastreamer.kafka.source.maxEvents config.
>
>
> Here is my POC of the imporvement:
> max executor core is 128.
> not turn the feature on
> (hoodie.deltastreamer.kafka.source.maxEvents=5000)
>
>
> turn on the feature (hoodie.deltastreamer.kafka.per.batch.maxEvents=20)
>
>
> after turn on the feature, the timing of Tagging reduce from 4.4 mins to
> 1.1 mins, can be more faster if given more cores.
>
> How do you think? can I file a jira issue for this?