[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode
[ https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng updated FLINK-9295: -- Description: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when operator-chaining results in two different sinks in the same topology being chained into a task, and thus into each of its sub-tasks. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. was: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when operator-chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. > FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in > EXACTLY_ONCE mode > - > > Key: FLINK-9295 > URL: https://issues.apache.org/jira/browse/FLINK-9295 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Christopher Ng >Priority: Major > > {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple > sinks are used within the same sub-task. This can happen when > operator-chaining results in two different sinks in the same topology being > chained into a task, and thus into each of its sub-tasks. > The problem is that {{TransactionIdsGenerator}} only takes into account the > task name, the subtask index, the number of subtasks, and a couple of other > things. All the attributes are the same between different > {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same > transaction ids and one of them ends up failing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode
[ https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng updated FLINK-9295: -- Description: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when operator-chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. was: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. > FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in > EXACTLY_ONCE mode > - > > Key: FLINK-9295 > URL: https://issues.apache.org/jira/browse/FLINK-9295 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Christopher Ng >Priority: Major > > {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple > sinks are used within the same sub-task. This can happen when > operator-chaining results in two different sinks in the same topology being > chained into a single sub-task. > The problem is that {{TransactionIdsGenerator}} only takes into account the > task name, the subtask index, the number of subtasks, and a couple of other > things. All the attributes are the same between different > {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same > transaction ids and one of them ends up failing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode
Christopher Ng created FLINK-9295: - Summary: FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode Key: FLINK-9295 URL: https://issues.apache.org/jira/browse/FLINK-9295 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Christopher Ng {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode
Christopher Ng created FLINK-9287: - Summary: KafkaProducer011 seems to leak threads when not in exactly-once mode Key: FLINK-9287 URL: https://issues.apache.org/jira/browse/FLINK-9287 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Christopher Ng {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread }}threads. As far as I can tell it happens when it is not in EXACTLY_ONCE mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, even when the {{FlinkKafkaProducer011}} itself is closed. I observed this when running a local cluster and submitting and then cancelling a job, a lot of kafka threads were left alive afterwards. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers
[ https://issues.apache.org/jira/browse/FLINK-9151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng closed FLINK-9151. - Resolution: Won't Fix This can be handled by `env.ssh.opts` provided your target servers allow you to send the `FLINK_CONF_DIR` env var in an ssh session. > standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task > managers > -- > > Key: FLINK-9151 > URL: https://issues.apache.org/jira/browse/FLINK-9151 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.4.1 >Reporter: Christopher Ng >Priority: Minor > > At the moment FLINK_CONF_DIR is not passed to the job manager and task > manager when they are started over SSH. This means that if the user has a > locally set FLINK_CONF_DIR that is not configured by their login shell, it is > not used by the launched job manager and task manager which can result in > silently failing to launch if there are errors due to Flink not using the > correct config dir. > One particular inconsistency is that a TaskManagers may be launched locally > (without ssh) on localhost, but JobManagers are always launched over ssh. In > my particular case this meant that the TaskManager launched but the > JobManager silently failed to launch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers
[ https://issues.apache.org/jira/browse/FLINK-9151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng updated FLINK-9151: -- Component/s: Startup Shell Scripts > standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task > managers > -- > > Key: FLINK-9151 > URL: https://issues.apache.org/jira/browse/FLINK-9151 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.4.1 >Reporter: Christopher Ng >Assignee: vinoyang >Priority: Minor > > At the moment FLINK_CONF_DIR is not passed to the job manager and task > manager when they are started over SSH. This means that if the user has a > locally set FLINK_CONF_DIR that is not configured by their login shell, it is > not used by the launched job manager and task manager which can result in > silently failing to launch if there are errors due to Flink not using the > correct config dir. > One particular inconsistency is that a TaskManagers may be launched locally > (without ssh) on localhost, but JobManagers are always launched over ssh. In > my particular case this meant that the TaskManager launched but the > JobManager silently failed to launch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers
[ https://issues.apache.org/jira/browse/FLINK-9151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng updated FLINK-9151: -- Affects Version/s: 1.4.1 > standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task > managers > -- > > Key: FLINK-9151 > URL: https://issues.apache.org/jira/browse/FLINK-9151 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.1 >Reporter: Christopher Ng >Priority: Minor > > At the moment FLINK_CONF_DIR is not passed to the job manager and task > manager when they are started over SSH. This means that if the user has a > locally set FLINK_CONF_DIR that is not configured by their login shell, it is > not used by the launched job manager and task manager which can result in > silently failing to launch if there are errors due to Flink not using the > correct config dir. > One particular inconsistency is that a TaskManagers may be launched locally > (without ssh) on localhost, but JobManagers are always launched over ssh. In > my particular case this meant that the TaskManager launched but the > JobManager silently failed to launch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers
Christopher Ng created FLINK-9151: - Summary: standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers Key: FLINK-9151 URL: https://issues.apache.org/jira/browse/FLINK-9151 Project: Flink Issue Type: Improvement Reporter: Christopher Ng At the moment FLINK_CONF_DIR is not passed to the job manager and task manager when they are started over SSH. This means that if the user has a locally set FLINK_CONF_DIR that is not configured by their login shell, it is not used by the launched job manager and task manager which can result in silently failing to launch if there are errors due to Flink not using the correct config dir. One particular inconsistency is that a TaskManagers may be launched locally (without ssh) on localhost, but JobManagers are always launched over ssh. In my particular case this meant that the TaskManager launched but the JobManager silently failed to launch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415826#comment-16415826 ] Christopher Ng edited comment on FLINK-8836 at 3/27/18 3:51 PM: [~StephanEwen] i asked on the Kryo mailing list and the answer (possibly not authoritative) was that given that {{Kryo}} itself is not intended to be thread-safe, we probably shouldn't assume that implementations of {{com.esotericsoftware.kryo.Serializer}} are thread-safe. I'd suggest we should actually assume the opposite. was (Author: facboy): [~StephanEwen] i asked on the Kryo mailing list and the answer (possibly not authoritative) was that given that Kryo itself is not intended to be thread-safe, we probably shouldn't assume that implementations of com.esotericsoftware.kryo.Serializer are thread-safe. I'd suggest we should actually assume the opposite. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415826#comment-16415826 ] Christopher Ng commented on FLINK-8836: --- [~StephanEwen] i asked on the Kryo mailing list and the answer (possibly not authoritative) was that given that Kryo itself is not intended to be thread-safe, we probably shouldn't assume that implementations of com.esotericsoftware.kryo.Serializer are thread-safe. I'd suggest we should actually assume the opposite. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413781#comment-16413781 ] Christopher Ng commented on FLINK-8836: --- I don't think they are thread-safe: https://github.com/EsotericSoftware/kryo#threading > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)