[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764785#comment-16764785
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11249:
-

Since it seems like we'll need more time to properly merge a fix for this, I'm 
moving Fix Version 1.7.2 to 1.7.3 for this issue.
Please let me know if you disagree with this.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-23 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750075#comment-16750075
 ] 

Piotr Nowojski commented on FLINK-11249:


Thanks for the response [~elevy]. That would be my guess & hope as well, but 
unfortunately we need to test for that.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-23 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749907#comment-16749907
 ] 

Elias Levy commented on FLINK-11249:


I believe that the open transactions are maintained.  The transaction data is 
recorded in the transaction log, which is an internal Kafka topic replicated 
three ways.  When a broker is restarted another broker's transaction 
coordinator becomes the leader for the transaction log partitions that were 
managed by the restarted broker.  The new transaction coordinator leader will 
read the transaction log partitions and rebuild the in memory transaction state 
and service the publishers.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16747937#comment-16747937
 ] 

Piotr Nowojski commented on FLINK-11249:


I have realised about one more issue. This fix alone might not fully solve the 
problem. With this fix in place, user will be able to update his job from 
{{0.11}} connector to the universal one, but what happens if he upgrades the 
Kafka Brokers at any point of time? If user stops Kafka Brokers, upgrades and 
then restarts them, does this process preserves the pending transactions, that 
Flink already "pre committed"? Or are they automatically aborted? If they are 
automatically aborted we might have a data loss from our perspective.

If "pre committed" transactions are aborted during the Kafka brokers upgrades, 
we would need "clean stop with savepoint" feature to handle this user story. I 
guess this needs more experiments and more testing.

CC [~tzulitai] [~aljoscha]

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-03 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16732912#comment-16732912
 ] 

vinoyang commented on FLINK-11249:
--

[~aljoscha] Understand, I will provide a custom serializer for the 
{{NextTransactionalIdHint}} state class.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.7.2, 1.8.0
>
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-03 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16732813#comment-16732813
 ] 

Aljoscha Krettek commented on FLINK-11249:
--

[~yanghua] The solution that Piotr already mentioned is the correct one: All 
state should be serialized using a custom-written {{TypeSerializer}}, not via 
Java serialization. The class itself already has examples of where we user a 
{{TypeSerializer}} for state.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.7.2, 1.8.0
>
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-02 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16732725#comment-16732725
 ] 

vinoyang commented on FLINK-11249:
--

[~pnowojski]  What do you think we define a new class in the 
"flink-connector-kafka-base" module and name it 
{{UniversalNextTransactionalIdHint}}, all versions of the producer depend on 
it, is this feasible?  cc [~tzulitai]

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.7.2, 1.8.0
>
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)