[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode

2018-05-03 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng updated FLINK-9295:
--
Description: 
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when 
operator-chaining results in two different sinks in the same topology being 
chained into a task, and thus into each of its sub-tasks.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.

  was:
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when 
operator-chaining results in two different sinks in the same topology being 
chained into a single sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.


> FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in 
> EXACTLY_ONCE mode
> -
>
> Key: FLINK-9295
> URL: https://issues.apache.org/jira/browse/FLINK-9295
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Ng
>Priority: Major
>
> {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
> sinks are used within the same sub-task.  This can happen when 
> operator-chaining results in two different sinks in the same topology being 
> chained into a task, and thus into each of its sub-tasks.
> The problem is that {{TransactionIdsGenerator}} only takes into account the 
> task name, the subtask index, the number of subtasks, and a couple of other 
> things.  All the attributes are the same between different 
> {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
> transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode

2018-05-03 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng updated FLINK-9295:
--
Description: 
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when 
operator-chaining results in two different sinks in the same topology being 
chained into a single sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.

  was:
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when chaining results 
in two different sinks in the same topology being chained into a single 
sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.


> FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in 
> EXACTLY_ONCE mode
> -
>
> Key: FLINK-9295
> URL: https://issues.apache.org/jira/browse/FLINK-9295
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Ng
>Priority: Major
>
> {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
> sinks are used within the same sub-task.  This can happen when 
> operator-chaining results in two different sinks in the same topology being 
> chained into a single sub-task.
> The problem is that {{TransactionIdsGenerator}} only takes into account the 
> task name, the subtask index, the number of subtasks, and a couple of other 
> things.  All the attributes are the same between different 
> {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
> transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode

2018-05-03 Thread Christopher Ng (JIRA)
Christopher Ng created FLINK-9295:
-

 Summary: FlinkKafkaProducer011 sometimes throws 
ProducerFencedExceptions when in EXACTLY_ONCE mode
 Key: FLINK-9295
 URL: https://issues.apache.org/jira/browse/FLINK-9295
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Christopher Ng


{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when chaining results 
in two different sinks in the same topology being chained into a single 
sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-02 Thread Christopher Ng (JIRA)
Christopher Ng created FLINK-9287:
-

 Summary: KafkaProducer011 seems to leak threads when not in 
exactly-once mode
 Key: FLINK-9287
 URL: https://issues.apache.org/jira/browse/FLINK-9287
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Christopher Ng


{{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread 
}}threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
even when the {{FlinkKafkaProducer011}} itself is closed.

I observed this when running a local cluster and submitting and then cancelling 
a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers

2018-04-12 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng closed FLINK-9151.
-
Resolution: Won't Fix

This can be handled by `env.ssh.opts` provided your target servers allow you to 
send the `FLINK_CONF_DIR` env var in an ssh session.

> standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task 
> managers
> --
>
> Key: FLINK-9151
> URL: https://issues.apache.org/jira/browse/FLINK-9151
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.4.1
>Reporter: Christopher Ng
>Priority: Minor
>
> At the moment FLINK_CONF_DIR is not passed to the job manager and task 
> manager when they are started over SSH.  This means that if the user has a 
> locally set FLINK_CONF_DIR that is not configured by their login shell, it is 
> not used by the launched job manager and task manager which can result in 
> silently failing to launch if there are errors due to Flink not using the 
> correct config dir.
> One particular inconsistency is that a TaskManagers may be launched locally 
> (without ssh) on localhost, but JobManagers are always launched over ssh.  In 
> my particular case this meant that the TaskManager launched but the 
> JobManager silently failed to launch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers

2018-04-10 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng updated FLINK-9151:
--
Component/s: Startup Shell Scripts

> standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task 
> managers
> --
>
> Key: FLINK-9151
> URL: https://issues.apache.org/jira/browse/FLINK-9151
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.4.1
>Reporter: Christopher Ng
>Assignee: vinoyang
>Priority: Minor
>
> At the moment FLINK_CONF_DIR is not passed to the job manager and task 
> manager when they are started over SSH.  This means that if the user has a 
> locally set FLINK_CONF_DIR that is not configured by their login shell, it is 
> not used by the launched job manager and task manager which can result in 
> silently failing to launch if there are errors due to Flink not using the 
> correct config dir.
> One particular inconsistency is that a TaskManagers may be launched locally 
> (without ssh) on localhost, but JobManagers are always launched over ssh.  In 
> my particular case this meant that the TaskManager launched but the 
> JobManager silently failed to launch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers

2018-04-09 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng updated FLINK-9151:
--
Affects Version/s: 1.4.1

> standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task 
> managers
> --
>
> Key: FLINK-9151
> URL: https://issues.apache.org/jira/browse/FLINK-9151
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: Christopher Ng
>Priority: Minor
>
> At the moment FLINK_CONF_DIR is not passed to the job manager and task 
> manager when they are started over SSH.  This means that if the user has a 
> locally set FLINK_CONF_DIR that is not configured by their login shell, it is 
> not used by the launched job manager and task manager which can result in 
> silently failing to launch if there are errors due to Flink not using the 
> correct config dir.
> One particular inconsistency is that a TaskManagers may be launched locally 
> (without ssh) on localhost, but JobManagers are always launched over ssh.  In 
> my particular case this meant that the TaskManager launched but the 
> JobManager silently failed to launch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9151) standalone cluster scripts should pass FLINK_CONF_DIR to job manager and task managers

2018-04-09 Thread Christopher Ng (JIRA)
Christopher Ng created FLINK-9151:
-

 Summary: standalone cluster scripts should pass FLINK_CONF_DIR to 
job manager and task managers
 Key: FLINK-9151
 URL: https://issues.apache.org/jira/browse/FLINK-9151
 Project: Flink
  Issue Type: Improvement
Reporter: Christopher Ng


At the moment FLINK_CONF_DIR is not passed to the job manager and task manager 
when they are started over SSH.  This means that if the user has a locally set 
FLINK_CONF_DIR that is not configured by their login shell, it is not used by 
the launched job manager and task manager which can result in silently failing 
to launch if there are errors due to Flink not using the correct config dir.

One particular inconsistency is that a TaskManagers may be launched locally 
(without ssh) on localhost, but JobManagers are always launched over ssh.  In 
my particular case this meant that the TaskManager launched but the JobManager 
silently failed to launch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-27 Thread Christopher Ng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415826#comment-16415826
 ] 

Christopher Ng edited comment on FLINK-8836 at 3/27/18 3:51 PM:


[~StephanEwen] i asked on the Kryo mailing list and the answer (possibly not 
authoritative) was that given that {{Kryo}} itself is not intended to be 
thread-safe, we probably shouldn't assume that implementations of 
{{com.esotericsoftware.kryo.Serializer}} are thread-safe.

I'd suggest we should actually assume the opposite.


was (Author: facboy):
[~StephanEwen] i asked on the Kryo mailing list and the answer (possibly not 
authoritative) was that given that Kryo itself is not intended to be 
thread-safe, we probably shouldn't assume that implementations of 
com.esotericsoftware.kryo.Serializer are thread-safe.

I'd suggest we should actually assume the opposite.

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-27 Thread Christopher Ng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415826#comment-16415826
 ] 

Christopher Ng commented on FLINK-8836:
---

[~StephanEwen] i asked on the Kryo mailing list and the answer (possibly not 
authoritative) was that given that Kryo itself is not intended to be 
thread-safe, we probably shouldn't assume that implementations of 
com.esotericsoftware.kryo.Serializer are thread-safe.

I'd suggest we should actually assume the opposite.

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-26 Thread Christopher Ng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413781#comment-16413781
 ] 

Christopher Ng commented on FLINK-8836:
---

I don't think they are thread-safe: 
https://github.com/EsotericSoftware/kryo#threading

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)