Re: FlinkKafkaConsumer09

2016-07-29 Thread Maximilian Michels
Thanks!

On Fri, Jul 29, 2016 at 11:43 AM, Gordon Tai (戴資力)  wrote:
> Hi Max,
>
> Sure, I was planning to do so, but wanted to see if it was a reasonable
> feature to add before opening a JIRA :)
> Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280
>
> Regards,
> Gordon
>
> On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels  wrote:
>
>> Hi Tai,
>>
>> Should definitely be possible. Would you mind opening a JIRA issue
>> with the description you posted?
>>
>> Thanks,
>> Max
>>
>> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon  wrote:
>> > Hi Kevin,
>> >
>> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
>> “smallest”
>> > for the older Kafka 0.8.
>> >
>> > I’m wondering whether or not it is reasonable to add a Flink-specific way
>> > to set the consumer’s starting position to “earliest” and “latest”,
>> without
>> > respecting the external Kafka offset store. Perhaps we can change the
>> > current behaviour (checking committed offsets in Kafka as starting point)
>> > as a user option, and add new options to read from “earliest” and
>> “latest”
>> > regardless of the groupId and externally committed offsets. I think this
>> > better matches how users usually interpret the functionality of setting
>> > starting positions, while also keeping the “auto.offset.reset” behaviour
>> > that frequent Kafka users are used to. Also, this would also more clearly
>> > define that under the context of Flink, the external Kafka offset store
>> is
>> > used only to expose the consumers progress to the outside world, and not
>> > used to manipulate how topics are read.
>> >
>> > Just an idea I have in mind, not sure if it would be a reasonable add.
>> It’d
>> > be great to hear what other think of this.
>> >
>> > Regards,
>> > Gordon
>> >
>> >
>> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
>> wrote:
>> >
>> > Thank you Gordon and Max,
>> >
>> > Thank you Gordon, that explains the behaviour a bit better to me. I am
>> > now adding the timestamp to the group ID and that is a good workaround
>> > for now. The "smallest" option is unfortunately not available in this
>> > version of the FlinkKafkaConsumer class.
>> >
>> > Cheers,
>> > Kevin
>> >
>> >
>> > On 28.07.2016 10:39, Maximilian Michels wrote:
>> >> Hi Kevin,
>> >>
>> >> You need to use properties.setProperty("auto.offset.reset",
>> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> >> "earliest") to achieve the same behavior.
>> >>
>> >> Kafka keeps track of the offsets per group id. If you have already
>> >> read from a topic with a certain group id and want to restart from the
>> >> smallest offset available, you need to generate a unique group id.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
>> > wrote:
>> >>> Hi,
>> >>>
>> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
>> > class. I
>> >>> am using Flink 1.0.3.
>> >>>
>> >>> These are my properties:
>> >>>
>> >>> val properties = new Properties()
>> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
>> >>> properties.setProperty("group.id", COLLECTOR_NAME)
>> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
>> >>>
>> >>> According to the new consumer API of Kafka, this should result in the
>> >>> following:
>> >>>
>> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
>> >>> smallest offset/ (source:
>> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
>> >>>
>> >>> However, it starts from the latest item in my topic. Is this a bug or
>> am
>> > I
>> >>> doing something wrong?
>> >>>
>> >>> Regards,
>> >>> Kevin
>> >>>
>>
>
>
>
> --
> Tzu-Li (Gordon) Tai


Re: FlinkKafkaConsumer09

2016-07-29 Thread 戴資力
Hi Max,

Sure, I was planning to do so, but wanted to see if it was a reasonable
feature to add before opening a JIRA :)
Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280

Regards,
Gordon

On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels  wrote:

> Hi Tai,
>
> Should definitely be possible. Would you mind opening a JIRA issue
> with the description you posted?
>
> Thanks,
> Max
>
> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon  wrote:
> > Hi Kevin,
> >
> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
> “smallest”
> > for the older Kafka 0.8.
> >
> > I’m wondering whether or not it is reasonable to add a Flink-specific way
> > to set the consumer’s starting position to “earliest” and “latest”,
> without
> > respecting the external Kafka offset store. Perhaps we can change the
> > current behaviour (checking committed offsets in Kafka as starting point)
> > as a user option, and add new options to read from “earliest” and
> “latest”
> > regardless of the groupId and externally committed offsets. I think this
> > better matches how users usually interpret the functionality of setting
> > starting positions, while also keeping the “auto.offset.reset” behaviour
> > that frequent Kafka users are used to. Also, this would also more clearly
> > define that under the context of Flink, the external Kafka offset store
> is
> > used only to expose the consumers progress to the outside world, and not
> > used to manipulate how topics are read.
> >
> > Just an idea I have in mind, not sure if it would be a reasonable add.
> It’d
> > be great to hear what other think of this.
> >
> > Regards,
> > Gordon
> >
> >
> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
> wrote:
> >
> > Thank you Gordon and Max,
> >
> > Thank you Gordon, that explains the behaviour a bit better to me. I am
> > now adding the timestamp to the group ID and that is a good workaround
> > for now. The "smallest" option is unfortunately not available in this
> > version of the FlinkKafkaConsumer class.
> >
> > Cheers,
> > Kevin
> >
> >
> > On 28.07.2016 10:39, Maximilian Michels wrote:
> >> Hi Kevin,
> >>
> >> You need to use properties.setProperty("auto.offset.reset",
> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
> >> "earliest") to achieve the same behavior.
> >>
> >> Kafka keeps track of the offsets per group id. If you have already
> >> read from a topic with a certain group id and want to restart from the
> >> smallest offset available, you need to generate a unique group id.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
> > wrote:
> >>> Hi,
> >>>
> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> > class. I
> >>> am using Flink 1.0.3.
> >>>
> >>> These are my properties:
> >>>
> >>> val properties = new Properties()
> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
> >>> properties.setProperty("group.id", COLLECTOR_NAME)
> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
> >>>
> >>> According to the new consumer API of Kafka, this should result in the
> >>> following:
> >>>
> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
> >>> smallest offset/ (source:
> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
> >>>
> >>> However, it starts from the latest item in my topic. Is this a bug or
> am
> > I
> >>> doing something wrong?
> >>>
> >>> Regards,
> >>> Kevin
> >>>
>



-- 
Tzu-Li (Gordon) Tai


Re: FlinkKafkaConsumer09

2016-07-29 Thread Maximilian Michels
Hi Tai,

Should definitely be possible. Would you mind opening a JIRA issue
with the description you posted?

Thanks,
Max

On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon  wrote:
> Hi Kevin,
>
> Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest”
> for the older Kafka 0.8.
>
> I’m wondering whether or not it is reasonable to add a Flink-specific way
> to set the consumer’s starting position to “earliest” and “latest”, without
> respecting the external Kafka offset store. Perhaps we can change the
> current behaviour (checking committed offsets in Kafka as starting point)
> as a user option, and add new options to read from “earliest” and “latest”
> regardless of the groupId and externally committed offsets. I think this
> better matches how users usually interpret the functionality of setting
> starting positions, while also keeping the “auto.offset.reset” behaviour
> that frequent Kafka users are used to. Also, this would also more clearly
> define that under the context of Flink, the external Kafka offset store is
> used only to expose the consumers progress to the outside world, and not
> used to manipulate how topics are read.
>
> Just an idea I have in mind, not sure if it would be a reasonable add. It’d
> be great to hear what other think of this.
>
> Regards,
> Gordon
>
>
> On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:
>
> Thank you Gordon and Max,
>
> Thank you Gordon, that explains the behaviour a bit better to me. I am
> now adding the timestamp to the group ID and that is a good workaround
> for now. The "smallest" option is unfortunately not available in this
> version of the FlinkKafkaConsumer class.
>
> Cheers,
> Kevin
>
>
> On 28.07.2016 10:39, Maximilian Michels wrote:
>> Hi Kevin,
>>
>> You need to use properties.setProperty("auto.offset.reset",
>> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> "earliest") to achieve the same behavior.
>>
>> Kafka keeps track of the offsets per group id. If you have already
>> read from a topic with a certain group id and want to restart from the
>> smallest offset available, you need to generate a unique group id.
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
> wrote:
>>> Hi,
>>>
>>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> class. I
>>> am using Flink 1.0.3.
>>>
>>> These are my properties:
>>>
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", config.urlKafka)
>>> properties.setProperty("group.id", COLLECTOR_NAME)
>>> properties.setProperty("auto.offset.reset", *"earliest"*)
>>>
>>> According to the new consumer API of Kafka, this should result in the
>>> following:
>>>
>>> /auto.offset.reset: * smallest : automatically reset the offset to the
>>> smallest offset/ (source:
>>> https://kafka.apache.org/documentation.html#newconsumerapi)
>>>
>>> However, it starts from the latest item in my topic. Is this a bug or am
> I
>>> doing something wrong?
>>>
>>> Regards,
>>> Kevin
>>>


Re: FlinkKafkaConsumer09

2016-07-28 Thread Tai Gordon
Hi Kevin,

Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest”
for the older Kafka 0.8.

I’m wondering whether or not it is reasonable to add a Flink-specific way
to set the consumer’s starting position to “earliest” and “latest”, without
respecting the external Kafka offset store. Perhaps we can change the
current behaviour (checking committed offsets in Kafka as starting point)
as a user option, and add new options to read from “earliest” and “latest”
regardless of the groupId and externally committed offsets. I think this
better matches how users usually interpret the functionality of setting
starting positions, while also keeping the “auto.offset.reset” behaviour
that frequent Kafka users are used to. Also, this would also more clearly
define that under the context of Flink, the external Kafka offset store is
used only to expose the consumers progress to the outside world, and not
used to manipulate how topics are read.

Just an idea I have in mind, not sure if it would be a reasonable add. It’d
be great to hear what other think of this.

Regards,
Gordon


On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:

Thank you Gordon and Max,

Thank you Gordon, that explains the behaviour a bit better to me. I am
now adding the timestamp to the group ID and that is a good workaround
for now. The "smallest" option is unfortunately not available in this
version of the FlinkKafkaConsumer class.

Cheers,
Kevin


On 28.07.2016 10:39, Maximilian Michels wrote:
> Hi Kevin,
>
> You need to use properties.setProperty("auto.offset.reset",
> "smallest") for Kafka 9 to start from the smallest offset. Note, that
> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
> "earliest") to achieve the same behavior.
>
> Kafka keeps track of the offsets per group id. If you have already
> read from a topic with a certain group id and want to restart from the
> smallest offset available, you need to generate a unique group id.
>
> Cheers,
> Max
>
> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
wrote:
>> Hi,
>>
>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
class. I
>> am using Flink 1.0.3.
>>
>> These are my properties:
>>
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", config.urlKafka)
>> properties.setProperty("group.id", COLLECTOR_NAME)
>> properties.setProperty("auto.offset.reset", *"earliest"*)
>>
>> According to the new consumer API of Kafka, this should result in the
>> following:
>>
>> /auto.offset.reset: * smallest : automatically reset the offset to the
>> smallest offset/ (source:
>> https://kafka.apache.org/documentation.html#newconsumerapi)
>>
>> However, it starts from the latest item in my topic. Is this a bug or am
I
>> doing something wrong?
>>
>> Regards,
>> Kevin
>>


Re: FlinkKafkaConsumer09

2016-07-28 Thread Kevin Jacobs

Thank you Gordon and Max,

Thank you Gordon, that explains the behaviour a bit better to me. I am 
now adding the timestamp to the group ID and that is a good workaround 
for now. The "smallest" option is unfortunately not available in this 
version of the FlinkKafkaConsumer class.


Cheers,
Kevin


On 28.07.2016 10:39, Maximilian Michels wrote:

Hi Kevin,

You need to use properties.setProperty("auto.offset.reset",
"smallest") for Kafka 9 to start from the smallest offset. Note, that
in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
"earliest") to achieve the same behavior.

Kafka keeps track of the offsets per group id. If you have already
read from a topic with a certain group id and want to restart from the
smallest offset available, you need to generate a unique group id.

Cheers,
Max

On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs  wrote:

Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I
am using Flink 1.0.3.

These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the
following:

/auto.offset.reset: * smallest : automatically reset the offset to the
smallest offset/ (source:
https://kafka.apache.org/documentation.html#newconsumerapi)

However, it starts from the latest item in my topic. Is this a bug or am I
doing something wrong?

Regards,
Kevin





Re: FlinkKafkaConsumer09

2016-07-28 Thread Maximilian Michels
Hi Kevin,

You need to use properties.setProperty("auto.offset.reset",
"smallest") for Kafka 9 to start from the smallest offset. Note, that
in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
"earliest") to achieve the same behavior.

Kafka keeps track of the offsets per group id. If you have already
read from a topic with a certain group id and want to restart from the
smallest offset available, you need to generate a unique group id.

Cheers,
Max

On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs  wrote:
> Hi,
>
> I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I
> am using Flink 1.0.3.
>
> These are my properties:
>
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", config.urlKafka)
> properties.setProperty("group.id", COLLECTOR_NAME)
> properties.setProperty("auto.offset.reset", *"earliest"*)
>
> According to the new consumer API of Kafka, this should result in the
> following:
>
> /auto.offset.reset: * smallest : automatically reset the offset to the
> smallest offset/ (source:
> https://kafka.apache.org/documentation.html#newconsumerapi)
>
> However, it starts from the latest item in my topic. Is this a bug or am I
> doing something wrong?
>
> Regards,
> Kevin
>


Re: FlinkKafkaConsumer09

2016-07-28 Thread Tai Gordon
Hi Kevin,

Was the same “group.id” used before?
What may be happening is that on startup of the consumer (not from failure
restore), any existing committed offset for the groupId in Kafka’s brokers
will be used as the starting point. The “auto.offset.reset” is only
respected when no committed offsets can be found.
Currently, if Flink’s checkpointing isn’t enabled, FlinkKafkaConsumer09
will periodically commit offsets back to Kafka brokers. So, it could be
that you’re actually using those offsets as the actual starting points.

Perhaps you can try using a new groupId and see if the behaviour still
exists?

Regards,
Gordon

On July 28, 2016 at 4:15:12 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:

Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09
class. I am using Flink 1.0.3.

These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the
following:

/auto.offset.reset: * smallest : automatically reset the offset to the
smallest offset/ (source:
https://kafka.apache.org/documentation.html#newconsumerapi)

However, it starts from the latest item in my topic. Is this a bug or am
I doing something wrong?

Regards,
Kevin