[jira] [Created] (FLINK-19460) AWS Kinesis Producer EXACTLY_ONCE semantic

2020-09-29 Thread Chris Slotterback (Jira)
Chris Slotterback created FLINK-19460:
-

 Summary: AWS Kinesis Producer EXACTLY_ONCE semantic 
 Key: FLINK-19460
 URL: https://issues.apache.org/jira/browse/FLINK-19460
 Project: Flink
  Issue Type: Improvement
Reporter: Chris Slotterback


Wanted to create a ticket to discuss adding EXACTLY_ONCE semantics to the AWS 
Kinesis producer, similar to how the Kafka producer functions.

The kinesis producer would need to be modified to participate in commits, per 
kinesis:
{noformat}
Each PutRecords request can support up to 500 records. Each record in the 
request can be as large as 1 MiB, up to a limit of 5 MiB for the entire 
request, including partition keys. Each shard can support writes up to 1,000 
records per second, up to a maximum data write total of 1 MiB per second.
{noformat}

Order is not guaranteed when writing to kinesis. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-29 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851427#comment-16851427
 ] 

Chris Slotterback edited comment on FLINK-10455 at 5/30/19 12:30 AM:
-

[~sunjincheng121] I think it should be solved, as it blocks exactly once 
production for any recovery scenarios. I don't have any flink commit experience 
or time right now, but I would be able to work with whoever does, or I can 
revisit in a couple of weeks and take a stab at fixing it myself.


was (Author: cslotterback):
[~sunjincheng121] I think it should be solved, as it blocks exactly once 
production. I don't have any flink commit experience or time right now, but I 
would be able to work with whoever does, or I can revisit in a couple of weeks 
and take a stab at fixing it myself.

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-29 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851427#comment-16851427
 ] 

Chris Slotterback commented on FLINK-10455:
---

[~sunjincheng121] I think it should be solved, as it blocks exactly once 
production. I don't have any flink commit experience or time right now, but I 
would be able to work with whoever does, or I can revisit in a couple of weeks 
and take a stab at fixing it myself.

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835749#comment-16835749
 ] 

Chris Slotterback commented on FLINK-10455:
---

The way I was able to reproduce this was forcing the job into a failed state by 
timing out writing a checkpoint on the filesystem. We occasionally see latency 
spikes in our env resulting in these job restarts. Most of the jobs are able to 
recover fine, but when using exactly once for the kafka producer the job gets 
stuck in this loop. My assumption is any job failure will reach the client 
processDisconnect method after the class loader is gone.

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12448) FlinkKafkaProducer late closure after class loader

2019-05-08 Thread Chris Slotterback (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Slotterback closed FLINK-12448.
-
Resolution: Duplicate

> FlinkKafkaProducer late closure after class loader
> --
>
> Key: FLINK-12448
> URL: https://issues.apache.org/jira/browse/FLINK-12448
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Chris Slotterback
>Priority: Major
>
> During job failure/restart, FlinkKafkaProducer configured with 
> Semantic.EXACTLY_ONCE fails to disconnect properly do to a 
> NoClassDefFoundError:
> {noformat}
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
> at 
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
> at 
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:748){noformat}
>  
> This begins a restart loop where the job never recovers properly. This is 
> reproducible only with EXACTLY_ONCE semantic, AT_LEAST_ONCE properly 
> disconnects and restarts without error.
> This issue is described in FLINK-10455, but has been since marked as Fixed, 
> but still reproducible.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12448) FlinkKafkaProducer late closure after class loader

2019-05-08 Thread Chris Slotterback (JIRA)
Chris Slotterback created FLINK-12448:
-

 Summary: FlinkKafkaProducer late closure after class loader
 Key: FLINK-12448
 URL: https://issues.apache.org/jira/browse/FLINK-12448
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.7.2
Reporter: Chris Slotterback


During job failure/restart, FlinkKafkaProducer configured with 
Semantic.EXACTLY_ONCE fails to disconnect properly do to a NoClassDefFoundError:
{noformat}
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1

at 
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)

at 
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)

at java.lang.Thread.run(Thread.java:748){noformat}
 

This begins a restart loop where the job never recovers properly. This is 
reproducible only with EXACTLY_ONCE semantic, AT_LEAST_ONCE properly 
disconnects and restarts without error.

This issue is described in FLINK-10455, but has been since marked as Fixed, but 
still reproducible.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247
 ] 

Chris Slotterback edited comment on FLINK-11654 at 3/8/19 7:59 PM:
---

Setting the {{UID}} does work in my testing to avoid transactionId collisions, 
as long as it remains unique across *every* producer writing to that topic. In 
a multi-datacenter configuration, I had collisions occurring across multiple 
flink clusters.

I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers


was (Author: cslotterback):
I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247
 ] 

Chris Slotterback commented on FLINK-11654:
---

I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)