Re: New KafkaSource API: Change in default behavior regarding starting offset

2022-06-20 Thread Shengkai Fang
hi.

Please use English in the user mail list. If you want to unsubscribe the
mail list, you can send mail to  user-unsubscr...@flink.apache.org
 .

Best,
Shengkai

liangzai  于2022年6月19日周日 10:36写道:

> 请问这个邮件咋退订?
>
>
>  Replied Message 
> From bastien dine 
> Date 06/15/2022 17:50
> To Martijn Visser 
> Cc Jing Ge ,
> user  
> Subject Re: New KafkaSource API : Change in default behavior regarding
> starting offset
> Hello Martijn,
>
> Thanks for the link to the release note, especially :
> "When resuming from the savepoint, please use
> setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new
> KafkaSourceBuilder to transfer the offsets to the new source."
> So earliest is the new default
> We use for sure  .committedOffsets - we have it by default in our custom
> KafkaSource builder to be sure we do not read all the previous data
> (earliest)
>
> What bother me is just this change in starting offset default behavior
> from FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
> In fact it happens that we drop some of our kafka source state to read
> again from kafka committed offset, but maybe nodoby does that ^^
>
> Anyway thanks for the focus on the release note !
>
> Best Regards,
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le mer. 15 juin 2022 à 10:58, Martijn Visser  a
> écrit :
>
>> Hi Bastien,
>>
>> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
>> included the instruction how to migrate from FlinkKafkaConsumer to
>> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
>> section on how to upgrade to the latest connector version that I think is
>> outdated. I'm leaning towards copying the migration instructions to the
>> generic documentation. Do you think that would have sufficed?
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
>>
>> Op wo 15 jun. 2022 om 09:22 schreef bastien dine > >:
>>
>>> Hello jing,
>>>
>>> This was the previous method in old Kafka consumer API, it has been
>>> removed in 1.15, so source code is not in master anymore,
>>> Yes I know for the new Offset initializer, committed offset + earliest
>>> as fallback can be used to have the same behavior as before
>>> I just wanted to know whether this is a changed behavior or I am missing
>>> something
>>>
>>>
>>>
>>> Bastien DINE
>>> Freelance
>>> Data Architect / Software Engineer / Sysadmin
>>> http://bastiendine.io
>>>
>>>
>>>
>>> Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :
>>>
>>>> Hi Bastien,
>>>>
>>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() 
>>>> within
>>>> Flink in the master branch. Could you please point out the code that
>>>> committed offset is used as default?
>>>>
>>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>>>> is used, an exception will be thrown at runtime in case there is no
>>>> committed offset, which is useful if the user is intended to read from the
>>>> committed offset but something is wrong. It might feel weird if it is used
>>>> as default, because an exception will be thrown when users start new jobs
>>>> with default settings.
>>>>
>>>> Best regards,
>>>> Jing
>>>>
>>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
>>>> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> Does someone know why the starting offset behaviour has changed in the
>>>>> new Kafka Source ?
>>>>>
>>>>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>>>>> "If offsets initializer is not specified,
>>>>> OffsetsInitializer.earliest() will be used by default." from :
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>>
>>>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>>>>> setStartFromGroupOffsets()
>>>>> method)
>>>>>
>>>>> which match with this behaviour in new KafkaSource :   :
>>>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>>>>
>>>>> This change can lead to big troubles if user pay no attention to this
>>>>> point when migrating from old KafkaConsumer to new KafkaSource,
>>>>>
>>>>> Regards,
>>>>> Bastien
>>>>>
>>>>> --
>>>>>
>>>>> Bastien DINE
>>>>> Data Architect / Software Engineer / Sysadmin
>>>>> bastiendine.io
>>>>>
>>>>


Re: New KafkaSource API: Change in default behavior regarding starting offset

2022-06-18 Thread liangzai
请问这个邮件咋退订?



 Replied Message 
| From | bastien dine |
| Date | 06/15/2022 17:50 |
| To | Martijn Visser |
| Cc | Jing Ge,
user  |
| Subject | Re: New KafkaSource API : Change in default behavior regarding 
starting offset |
Hello Martijn,


Thanks for the link to the release note, especially : 
"When resuming from the savepoint, please use 
setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new 
KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom 
KafkaSource builder to be sure we do not read all the previous data (earliest)


What bother me is just this change in starting offset default behavior from 
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read again 
from kafka committed offset, but maybe nodoby does that ^^


Anyway thanks for the focus on the release note ! 


Best Regards,


--


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io





Le mer. 15 juin 2022 à 10:58, Martijn Visser  a écrit 
:

Hi Bastien,


When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes 
included the instruction how to migrate from FlinkKafkaConsumer to 
KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a section 
on how to upgrade to the latest connector version that I think is outdated. I'm 
leaning towards copying the migration instructions to the generic 
documentation. Do you think that would have sufficed? 


Best regards,


Martijn


[1] 
https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version


Op wo 15 jun. 2022 om 09:22 schreef bastien dine :

Hello jing,


This was the previous method in old Kafka consumer API, it has been removed in 
1.15, so source code is not in master anymore,
Yes I know for the new Offset initializer, committed offset + earliest as 
fallback can be used to have the same behavior as before
I just wanted to know whether this is a changed behavior or I am missing 
something






Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io

   


Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :

Hi Bastien,


Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within 
Flink in the master branch. Could you please point out the code that committed 
offset is used as default?   


W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be thrown at runtime in case there is no committed 
offset, which is useful if the user is intended to read from the committed 
offset but something is wrong. It might feel weird if it is used as default, 
because an exception will be thrown when users start new jobs with default 
settings.


Best regards,
Jing


On Tue, Jun 14, 2022 at 4:15 PM bastien dine  wrote:

Hello everyone,


Does someone know why the starting offset behaviour has changed in the new 
Kafka Source ? 


This is now from earliest (code in KafkaSourceBuilder), doc says : 
"If offsets initializer is not specified, OffsetsInitializer.earliest() will be 
used by default." from : 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset


Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
setStartFromGroupOffsets() method)


which match with this behaviour in new KafkaSource :   : 
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST


This change can lead to big troubles if user pay no attention to this point 
when migrating from old KafkaConsumer to new KafkaSource,


Regards,
Bastien


--


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread bastien dine
Hello Martijn,

Thanks for the link to the release note, especially :
"When resuming from the savepoint, please use
setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new
KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom
KafkaSource builder to be sure we do not read all the previous data
(earliest)

What bother me is just this change in starting offset default behavior from
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read
again from kafka committed offset, but maybe nodoby does that ^^

Anyway thanks for the focus on the release note !

Best Regards,

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 15 juin 2022 à 10:58, Martijn Visser  a
écrit :

> Hi Bastien,
>
> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
> included the instruction how to migrate from FlinkKafkaConsumer to
> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
> section on how to upgrade to the latest connector version that I think is
> outdated. I'm leaning towards copying the migration instructions to the
> generic documentation. Do you think that would have sufficed?
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
>
> Op wo 15 jun. 2022 om 09:22 schreef bastien dine :
>
>> Hello jing,
>>
>> This was the previous method in old Kafka consumer API, it has been
>> removed in 1.15, so source code is not in master anymore,
>> Yes I know for the new Offset initializer, committed offset + earliest as
>> fallback can be used to have the same behavior as before
>> I just wanted to know whether this is a changed behavior or I am missing
>> something
>>
>>
>>
>> Bastien DINE
>> Freelance
>> Data Architect / Software Engineer / Sysadmin
>> http://bastiendine.io
>>
>>
>>
>> Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :
>>
>>> Hi Bastien,
>>>
>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() 
>>> within
>>> Flink in the master branch. Could you please point out the code that
>>> committed offset is used as default?
>>>
>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>>> is used, an exception will be thrown at runtime in case there is no
>>> committed offset, which is useful if the user is intended to read from the
>>> committed offset but something is wrong. It might feel weird if it is used
>>> as default, because an exception will be thrown when users start new jobs
>>> with default settings.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
>>> wrote:
>>>
 Hello everyone,

 Does someone know why the starting offset behaviour has changed in the
 new Kafka Source ?

 This is now from earliest (code in KafkaSourceBuilder), doc says :
 "If offsets initializer is not specified, OffsetsInitializer.earliest() 
 will
 be used by default." from :
 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset

 Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
 setStartFromGroupOffsets()
 method)

 which match with this behaviour in new KafkaSource :   :
 OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST

 This change can lead to big troubles if user pay no attention to this
 point when migrating from old KafkaConsumer to new KafkaSource,

 Regards,
 Bastien

 --

 Bastien DINE
 Data Architect / Software Engineer / Sysadmin
 bastiendine.io

>>>


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread Martijn Visser
Hi Bastien,

When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
included the instruction how to migrate from FlinkKafkaConsumer to
KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
section on how to upgrade to the latest connector version that I think is
outdated. I'm leaning towards copying the migration instructions to the
generic documentation. Do you think that would have sufficed?

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version

Op wo 15 jun. 2022 om 09:22 schreef bastien dine :

> Hello jing,
>
> This was the previous method in old Kafka consumer API, it has been
> removed in 1.15, so source code is not in master anymore,
> Yes I know for the new Offset initializer, committed offset + earliest as
> fallback can be used to have the same behavior as before
> I just wanted to know whether this is a changed behavior or I am missing
> something
>
>
>
> Bastien DINE
> Freelance
> Data Architect / Software Engineer / Sysadmin
> http://bastiendine.io
>
>
>
> Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :
>
>> Hi Bastien,
>>
>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() 
>> within
>> Flink in the master branch. Could you please point out the code that
>> committed offset is used as default?
>>
>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>> is used, an exception will be thrown at runtime in case there is no
>> committed offset, which is useful if the user is intended to read from the
>> committed offset but something is wrong. It might feel weird if it is used
>> as default, because an exception will be thrown when users start new jobs
>> with default settings.
>>
>> Best regards,
>> Jing
>>
>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
>> wrote:
>>
>>> Hello everyone,
>>>
>>> Does someone know why the starting offset behaviour has changed in the
>>> new Kafka Source ?
>>>
>>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
>>> be used by default." from :
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>
>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>>> setStartFromGroupOffsets()
>>> method)
>>>
>>> which match with this behaviour in new KafkaSource :   :
>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>>
>>> This change can lead to big troubles if user pay no attention to this
>>> point when migrating from old KafkaConsumer to new KafkaSource,
>>>
>>> Regards,
>>> Bastien
>>>
>>> --
>>>
>>> Bastien DINE
>>> Data Architect / Software Engineer / Sysadmin
>>> bastiendine.io
>>>
>>


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread bastien dine
Hello jing,

This was the previous method in old Kafka consumer API, it has been removed
in 1.15, so source code is not in master anymore,
Yes I know for the new Offset initializer, committed offset + earliest as
fallback can be used to have the same behavior as before
I just wanted to know whether this is a changed behavior or I am missing
something



Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io



Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :

> Hi Bastien,
>
> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
> Flink in the master branch. Could you please point out the code that
> committed offset is used as default?
>
> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
> is used, an exception will be thrown at runtime in case there is no
> committed offset, which is useful if the user is intended to read from the
> committed offset but something is wrong. It might feel weird if it is used
> as default, because an exception will be thrown when users start new jobs
> with default settings.
>
> Best regards,
> Jing
>
> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
> wrote:
>
>> Hello everyone,
>>
>> Does someone know why the starting offset behaviour has changed in the
>> new Kafka Source ?
>>
>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
>> be used by default." from :
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>
>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>> setStartFromGroupOffsets()
>> method)
>>
>> which match with this behaviour in new KafkaSource :   :
>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>
>> This change can lead to big troubles if user pay no attention to this
>> point when migrating from old KafkaConsumer to new KafkaSource,
>>
>> Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread Jing Ge
Hi Bastien,

Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
Flink in the master branch. Could you please point out the code that
committed offset is used as default?

W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be thrown at runtime in case there is no
committed offset, which is useful if the user is intended to read from the
committed offset but something is wrong. It might feel weird if it is used
as default, because an exception will be thrown when users start new jobs
with default settings.

Best regards,
Jing

On Tue, Jun 14, 2022 at 4:15 PM bastien dine  wrote:

> Hello everyone,
>
> Does someone know why the starting offset behaviour has changed in the new
> Kafka Source ?
>
> This is now from earliest (code in KafkaSourceBuilder), doc says :
> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
> be used by default." from :
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>
> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
> setStartFromGroupOffsets()
> method)
>
> which match with this behaviour in new KafkaSource :   :
> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>
> This change can lead to big troubles if user pay no attention to this
> point when migrating from old KafkaConsumer to new KafkaSource,
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread bastien dine
Hello everyone,

Does someone know why the starting offset behaviour has changed in the new
Kafka Source ?

This is now from earliest (code in KafkaSourceBuilder), doc says :
"If offsets initializer is not specified, OffsetsInitializer.earliest() will
be used by default." from :
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset

Before in old FlinkKafkaConsumer it was from committed offset (i.e :
setStartFromGroupOffsets()
method)

which match with this behaviour in new KafkaSource :   : OffsetsInitializer.
committedOffsets(OffsetResetStrategy.EARLIEST

This change can lead to big troubles if user pay no attention to this point
when migrating from old KafkaConsumer to new KafkaSource,

Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io