[jira] [Created] (FLINK-6520) FlinkKafkaConsumer09+ does not overwrite props to not auto commit offsets when commit mode is OffsetCommitMode.ON_CHECKPOINTS

2017-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6520:
--

 Summary: FlinkKafkaConsumer09+ does not overwrite props to not 
auto commit offsets when commit mode is OffsetCommitMode.ON_CHECKPOINTS
 Key: FLINK-6520
 URL: https://issues.apache.org/jira/browse/FLINK-6520
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


When the determined offset commit mode is `OffsetCommitMode.ON_CHECKPOINTS`, 
FlinkKafkaConsumer's need to overwrite whatever setting was set by the user in 
the config properties to disable automatic offset committing through the Kafka 
client.

This is properly done in the 0.8 consumer, but not in the 0.9+ consumers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6366) KafkaConsumer is not closed in FlinkKafkaConsumer09

2017-04-24 Thread Fang Yong (JIRA)
Fang Yong created FLINK-6366:


 Summary: KafkaConsumer is not closed in FlinkKafkaConsumer09
 Key: FLINK-6366
 URL: https://issues.apache.org/jira/browse/FLINK-6366
 Project: Flink
  Issue Type: Bug
Reporter: Fang Yong


In getKafkaPartitions of FlinkKafkaConsumer09, the KafkaConsumer is created as 
flowers and will not be closed.
{code:title=FlinkKafkaConsumer09.java|borderStyle=solid}
protected List getKafkaPartitions(List topics) {
// read the partitions that belong to the listed topics
final List partitions = new ArrayList<>();

try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(this.properties)) {
for (final String topic: topics) {
// get partitions for each topic
List partitionsForTopic = 
consumer.partitionsFor(topic);
...
}
}
...
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-4722) Consumer group concept not working properly with FlinkKafkaConsumer09

2016-10-01 Thread Sudhanshu Sekhar Lenka (JIRA)
Sudhanshu Sekhar Lenka created FLINK-4722:
-

 Summary: Consumer group concept not working properly with 
FlinkKafkaConsumer09  
 Key: FLINK-4722
 URL: https://issues.apache.org/jira/browse/FLINK-4722
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.2
Reporter: Sudhanshu Sekhar Lenka


When Kafka one Topic has 3 partition and 3 FlinkKafkaConsumer09 connected to 
that same topic using "group.id" ,"myGroup" property . Still flink consumer get 
all data which are push to each 3   partition . While it work properly with 
normal java consumer. each consumer get specific data.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: FlinkKafkaConsumer09

2016-07-29 Thread Maximilian Michels
Thanks!

On Fri, Jul 29, 2016 at 11:43 AM, Gordon Tai (戴資力) <tzuli...@gmail.com> wrote:
> Hi Max,
>
> Sure, I was planning to do so, but wanted to see if it was a reasonable
> feature to add before opening a JIRA :)
> Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280
>
> Regards,
> Gordon
>
> On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels <m...@apache.org> wrote:
>
>> Hi Tai,
>>
>> Should definitely be possible. Would you mind opening a JIRA issue
>> with the description you posted?
>>
>> Thanks,
>> Max
>>
>> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzuli...@gmail.com> wrote:
>> > Hi Kevin,
>> >
>> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
>> “smallest”
>> > for the older Kafka 0.8.
>> >
>> > I’m wondering whether or not it is reasonable to add a Flink-specific way
>> > to set the consumer’s starting position to “earliest” and “latest”,
>> without
>> > respecting the external Kafka offset store. Perhaps we can change the
>> > current behaviour (checking committed offsets in Kafka as starting point)
>> > as a user option, and add new options to read from “earliest” and
>> “latest”
>> > regardless of the groupId and externally committed offsets. I think this
>> > better matches how users usually interpret the functionality of setting
>> > starting positions, while also keeping the “auto.offset.reset” behaviour
>> > that frequent Kafka users are used to. Also, this would also more clearly
>> > define that under the context of Flink, the external Kafka offset store
>> is
>> > used only to expose the consumers progress to the outside world, and not
>> > used to manipulate how topics are read.
>> >
>> > Just an idea I have in mind, not sure if it would be a reasonable add.
>> It’d
>> > be great to hear what other think of this.
>> >
>> > Regards,
>> > Gordon
>> >
>> >
>> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
>> wrote:
>> >
>> > Thank you Gordon and Max,
>> >
>> > Thank you Gordon, that explains the behaviour a bit better to me. I am
>> > now adding the timestamp to the group ID and that is a good workaround
>> > for now. The "smallest" option is unfortunately not available in this
>> > version of the FlinkKafkaConsumer class.
>> >
>> > Cheers,
>> > Kevin
>> >
>> >
>> > On 28.07.2016 10:39, Maximilian Michels wrote:
>> >> Hi Kevin,
>> >>
>> >> You need to use properties.setProperty("auto.offset.reset",
>> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> >> "earliest") to achieve the same behavior.
>> >>
>> >> Kafka keeps track of the offsets per group id. If you have already
>> >> read from a topic with a certain group id and want to restart from the
>> >> smallest offset available, you need to generate a unique group id.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch>
>> > wrote:
>> >>> Hi,
>> >>>
>> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
>> > class. I
>> >>> am using Flink 1.0.3.
>> >>>
>> >>> These are my properties:
>> >>>
>> >>> val properties = new Properties()
>> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
>> >>> properties.setProperty("group.id", COLLECTOR_NAME)
>> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
>> >>>
>> >>> According to the new consumer API of Kafka, this should result in the
>> >>> following:
>> >>>
>> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
>> >>> smallest offset/ (source:
>> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
>> >>>
>> >>> However, it starts from the latest item in my topic. Is this a bug or
>> am
>> > I
>> >>> doing something wrong?
>> >>>
>> >>> Regards,
>> >>> Kevin
>> >>>
>>
>
>
>
> --
> Tzu-Li (Gordon) Tai


Re: FlinkKafkaConsumer09

2016-07-29 Thread 戴資力
Hi Max,

Sure, I was planning to do so, but wanted to see if it was a reasonable
feature to add before opening a JIRA :)
Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280

Regards,
Gordon

On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Tai,
>
> Should definitely be possible. Would you mind opening a JIRA issue
> with the description you posted?
>
> Thanks,
> Max
>
> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzuli...@gmail.com> wrote:
> > Hi Kevin,
> >
> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
> “smallest”
> > for the older Kafka 0.8.
> >
> > I’m wondering whether or not it is reasonable to add a Flink-specific way
> > to set the consumer’s starting position to “earliest” and “latest”,
> without
> > respecting the external Kafka offset store. Perhaps we can change the
> > current behaviour (checking committed offsets in Kafka as starting point)
> > as a user option, and add new options to read from “earliest” and
> “latest”
> > regardless of the groupId and externally committed offsets. I think this
> > better matches how users usually interpret the functionality of setting
> > starting positions, while also keeping the “auto.offset.reset” behaviour
> > that frequent Kafka users are used to. Also, this would also more clearly
> > define that under the context of Flink, the external Kafka offset store
> is
> > used only to expose the consumers progress to the outside world, and not
> > used to manipulate how topics are read.
> >
> > Just an idea I have in mind, not sure if it would be a reasonable add.
> It’d
> > be great to hear what other think of this.
> >
> > Regards,
> > Gordon
> >
> >
> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
> wrote:
> >
> > Thank you Gordon and Max,
> >
> > Thank you Gordon, that explains the behaviour a bit better to me. I am
> > now adding the timestamp to the group ID and that is a good workaround
> > for now. The "smallest" option is unfortunately not available in this
> > version of the FlinkKafkaConsumer class.
> >
> > Cheers,
> > Kevin
> >
> >
> > On 28.07.2016 10:39, Maximilian Michels wrote:
> >> Hi Kevin,
> >>
> >> You need to use properties.setProperty("auto.offset.reset",
> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
> >> "earliest") to achieve the same behavior.
> >>
> >> Kafka keeps track of the offsets per group id. If you have already
> >> read from a topic with a certain group id and want to restart from the
> >> smallest offset available, you need to generate a unique group id.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch>
> > wrote:
> >>> Hi,
> >>>
> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> > class. I
> >>> am using Flink 1.0.3.
> >>>
> >>> These are my properties:
> >>>
> >>> val properties = new Properties()
> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
> >>> properties.setProperty("group.id", COLLECTOR_NAME)
> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
> >>>
> >>> According to the new consumer API of Kafka, this should result in the
> >>> following:
> >>>
> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
> >>> smallest offset/ (source:
> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
> >>>
> >>> However, it starts from the latest item in my topic. Is this a bug or
> am
> > I
> >>> doing something wrong?
> >>>
> >>> Regards,
> >>> Kevin
> >>>
>



-- 
Tzu-Li (Gordon) Tai


Re: FlinkKafkaConsumer09

2016-07-29 Thread Maximilian Michels
Hi Tai,

Should definitely be possible. Would you mind opening a JIRA issue
with the description you posted?

Thanks,
Max

On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzuli...@gmail.com> wrote:
> Hi Kevin,
>
> Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest”
> for the older Kafka 0.8.
>
> I’m wondering whether or not it is reasonable to add a Flink-specific way
> to set the consumer’s starting position to “earliest” and “latest”, without
> respecting the external Kafka offset store. Perhaps we can change the
> current behaviour (checking committed offsets in Kafka as starting point)
> as a user option, and add new options to read from “earliest” and “latest”
> regardless of the groupId and externally committed offsets. I think this
> better matches how users usually interpret the functionality of setting
> starting positions, while also keeping the “auto.offset.reset” behaviour
> that frequent Kafka users are used to. Also, this would also more clearly
> define that under the context of Flink, the external Kafka offset store is
> used only to expose the consumers progress to the outside world, and not
> used to manipulate how topics are read.
>
> Just an idea I have in mind, not sure if it would be a reasonable add. It’d
> be great to hear what other think of this.
>
> Regards,
> Gordon
>
>
> On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:
>
> Thank you Gordon and Max,
>
> Thank you Gordon, that explains the behaviour a bit better to me. I am
> now adding the timestamp to the group ID and that is a good workaround
> for now. The "smallest" option is unfortunately not available in this
> version of the FlinkKafkaConsumer class.
>
> Cheers,
> Kevin
>
>
> On 28.07.2016 10:39, Maximilian Michels wrote:
>> Hi Kevin,
>>
>> You need to use properties.setProperty("auto.offset.reset",
>> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> "earliest") to achieve the same behavior.
>>
>> Kafka keeps track of the offsets per group id. If you have already
>> read from a topic with a certain group id and want to restart from the
>> smallest offset available, you need to generate a unique group id.
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch>
> wrote:
>>> Hi,
>>>
>>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> class. I
>>> am using Flink 1.0.3.
>>>
>>> These are my properties:
>>>
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", config.urlKafka)
>>> properties.setProperty("group.id", COLLECTOR_NAME)
>>> properties.setProperty("auto.offset.reset", *"earliest"*)
>>>
>>> According to the new consumer API of Kafka, this should result in the
>>> following:
>>>
>>> /auto.offset.reset: * smallest : automatically reset the offset to the
>>> smallest offset/ (source:
>>> https://kafka.apache.org/documentation.html#newconsumerapi)
>>>
>>> However, it starts from the latest item in my topic. Is this a bug or am
> I
>>> doing something wrong?
>>>
>>> Regards,
>>> Kevin
>>>


Re: FlinkKafkaConsumer09

2016-07-28 Thread Tai Gordon
Hi Kevin,

Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest”
for the older Kafka 0.8.

I’m wondering whether or not it is reasonable to add a Flink-specific way
to set the consumer’s starting position to “earliest” and “latest”, without
respecting the external Kafka offset store. Perhaps we can change the
current behaviour (checking committed offsets in Kafka as starting point)
as a user option, and add new options to read from “earliest” and “latest”
regardless of the groupId and externally committed offsets. I think this
better matches how users usually interpret the functionality of setting
starting positions, while also keeping the “auto.offset.reset” behaviour
that frequent Kafka users are used to. Also, this would also more clearly
define that under the context of Flink, the external Kafka offset store is
used only to expose the consumers progress to the outside world, and not
used to manipulate how topics are read.

Just an idea I have in mind, not sure if it would be a reasonable add. It’d
be great to hear what other think of this.

Regards,
Gordon


On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:

Thank you Gordon and Max,

Thank you Gordon, that explains the behaviour a bit better to me. I am
now adding the timestamp to the group ID and that is a good workaround
for now. The "smallest" option is unfortunately not available in this
version of the FlinkKafkaConsumer class.

Cheers,
Kevin


On 28.07.2016 10:39, Maximilian Michels wrote:
> Hi Kevin,
>
> You need to use properties.setProperty("auto.offset.reset",
> "smallest") for Kafka 9 to start from the smallest offset. Note, that
> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
> "earliest") to achieve the same behavior.
>
> Kafka keeps track of the offsets per group id. If you have already
> read from a topic with a certain group id and want to restart from the
> smallest offset available, you need to generate a unique group id.
>
> Cheers,
> Max
>
> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch>
wrote:
>> Hi,
>>
>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
class. I
>> am using Flink 1.0.3.
>>
>> These are my properties:
>>
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", config.urlKafka)
>> properties.setProperty("group.id", COLLECTOR_NAME)
>> properties.setProperty("auto.offset.reset", *"earliest"*)
>>
>> According to the new consumer API of Kafka, this should result in the
>> following:
>>
>> /auto.offset.reset: * smallest : automatically reset the offset to the
>> smallest offset/ (source:
>> https://kafka.apache.org/documentation.html#newconsumerapi)
>>
>> However, it starts from the latest item in my topic. Is this a bug or am
I
>> doing something wrong?
>>
>> Regards,
>> Kevin
>>


Re: FlinkKafkaConsumer09

2016-07-28 Thread Kevin Jacobs

Thank you Gordon and Max,

Thank you Gordon, that explains the behaviour a bit better to me. I am 
now adding the timestamp to the group ID and that is a good workaround 
for now. The "smallest" option is unfortunately not available in this 
version of the FlinkKafkaConsumer class.


Cheers,
Kevin


On 28.07.2016 10:39, Maximilian Michels wrote:

Hi Kevin,

You need to use properties.setProperty("auto.offset.reset",
"smallest") for Kafka 9 to start from the smallest offset. Note, that
in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
"earliest") to achieve the same behavior.

Kafka keeps track of the offsets per group id. If you have already
read from a topic with a certain group id and want to restart from the
smallest offset available, you need to generate a unique group id.

Cheers,
Max

On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch> wrote:

Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I
am using Flink 1.0.3.

These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the
following:

/auto.offset.reset: * smallest : automatically reset the offset to the
smallest offset/ (source:
https://kafka.apache.org/documentation.html#newconsumerapi)

However, it starts from the latest item in my topic. Is this a bug or am I
doing something wrong?

Regards,
Kevin





Re: FlinkKafkaConsumer09

2016-07-28 Thread Maximilian Michels
Hi Kevin,

You need to use properties.setProperty("auto.offset.reset",
"smallest") for Kafka 9 to start from the smallest offset. Note, that
in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
"earliest") to achieve the same behavior.

Kafka keeps track of the offsets per group id. If you have already
read from a topic with a certain group id and want to restart from the
smallest offset available, you need to generate a unique group id.

Cheers,
Max

On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch> wrote:
> Hi,
>
> I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I
> am using Flink 1.0.3.
>
> These are my properties:
>
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", config.urlKafka)
> properties.setProperty("group.id", COLLECTOR_NAME)
> properties.setProperty("auto.offset.reset", *"earliest"*)
>
> According to the new consumer API of Kafka, this should result in the
> following:
>
> /auto.offset.reset: * smallest : automatically reset the offset to the
> smallest offset/ (source:
> https://kafka.apache.org/documentation.html#newconsumerapi)
>
> However, it starts from the latest item in my topic. Is this a bug or am I
> doing something wrong?
>
> Regards,
> Kevin
>


Re: FlinkKafkaConsumer09

2016-07-28 Thread Tai Gordon
Hi Kevin,

Was the same “group.id” used before?
What may be happening is that on startup of the consumer (not from failure
restore), any existing committed offset for the groupId in Kafka’s brokers
will be used as the starting point. The “auto.offset.reset” is only
respected when no committed offsets can be found.
Currently, if Flink’s checkpointing isn’t enabled, FlinkKafkaConsumer09
will periodically commit offsets back to Kafka brokers. So, it could be
that you’re actually using those offsets as the actual starting points.

Perhaps you can try using a new groupId and see if the behaviour still
exists?

Regards,
Gordon

On July 28, 2016 at 4:15:12 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:

Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09
class. I am using Flink 1.0.3.

These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the
following:

/auto.offset.reset: * smallest : automatically reset the offset to the
smallest offset/ (source:
https://kafka.apache.org/documentation.html#newconsumerapi)

However, it starts from the latest item in my topic. Is this a bug or am
I doing something wrong?

Regards,
Kevin


FlinkKafkaConsumer09

2016-07-28 Thread Kevin Jacobs

Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09 
class. I am using Flink 1.0.3.


These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the 
following:


/auto.offset.reset: * smallest : automatically reset the offset to the 
smallest offset/ (source: 
https://kafka.apache.org/documentation.html#newconsumerapi)


However, it starts from the latest item in my topic. Is this a bug or am 
I doing something wrong?


Regards,
Kevin



[jira] [Created] (FLINK-3542) FlinkKafkaConsumer09 cannot handle changing number of partitions

2016-02-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3542:


 Summary: FlinkKafkaConsumer09 cannot handle changing number of 
partitions
 Key: FLINK-3542
 URL: https://issues.apache.org/jira/browse/FLINK-3542
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.0.0
Reporter: Till Rohrmann
Priority: Minor


The current {{FlinkKafkaConsumer09}} cannot handle increasing the number of 
partitions of a topic while running. The consumer will simply leave the newly 
created partitions out and thus miss all data which is written to the new 
partitions. The reason seems to be a static assignment of partitions to 
consumer tasks when the job is started.

We should either fix this behaviour or clearly document it in the online and 
code docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)