[jira] [Assigned] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-11654: -- Assignee: (was: freezhan) > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Labels: stale-assigned > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-11654: Assignee: freezhan (was: Jiangjie Qin) > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: freezhan >Priority: Major > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin reassigned FLINK-11654: Assignee: Jiangjie Qin > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Jiangjie Qin >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-11654: Assignee: (was: Aljoscha Krettek) > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Blocker > Fix For: 1.8.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-11654: Assignee: Aljoscha Krettek > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.8.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)