[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-21 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019140#comment-16019140
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


Let me think a bit about how to proceed with this.. Will keep you updated.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-21 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019139#comment-16019139
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


[~skidder] thanks a lot for testing that. The broken checkpoint makes sense.

In general I think we should try to avoid directly serializing these 
third-party classes in our checkpoints, to ease compatibility paths for 
savepoints.
I propose to wait for Flink 1.4 with State Migration to allow a smoother path 
for this, instead of yet another hardcoded migration path in the codebase 
(those have been starting to pile up and IMO really polluting the main code a 
bit).

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017653#comment-16017653
 ] 

Scott Kidder commented on FLINK-5898:
-

I created a new build of Flink that uses KPL {{0.12.4}} and AWS SDK 
{{1.11.128}}. I had a job that was unable to restore from an earlier checkpoint 
made with my patched KPL {{0.12.3}} and AWS SDK {{1.11.86}}:

{noformat}
java.io.InvalidClassException: com.amazonaws.services.kinesis.model.Shard; 
local class incompatible: stream classdesc serialVersionUID = 
206186249602915, local class serialVersionUID = 5010840014163691006
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1829)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at java.util.HashMap.readObject(HashMap.java:1402)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:307)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:166)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreStreamCheckpointed(AbstractStreamOperator.java:240)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:641)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:247)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
{noformat}

So, there are incompatible changes in the Kinesis {{Shard}} class included in 
the AWS SDK release referenced directly by KPL {{0.12.4}}. Just something to be 
aware of when upgrading the KPL.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> 

[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017473#comment-16017473
 ] 

Scott Kidder commented on FLINK-5898:
-

Hi Gordon! I think this issue and FLINK-5946 warrant upgrading the default KPL 
dependency to 0.12.4.

Also included in KPL 0.12.4 is a change to the AWS SDK core library dependency. 
In previous versions of the KPL, this dependency was expressed as a range, but 
now it's pinned to a specific version: `1.11.128`

I've been using `1.11.86` with my patched KPL, but would like to test 
`1.11.128` before suggesting we upgrade the KPL dependency to 0.12.4.

What do you think?

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017452#comment-16017452
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


Thanks for the update Scott! Would it make sense to bump the AWS KPL version 
we're using by default because of this?

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-19 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017444#comment-16017444
 ] 

Scott Kidder commented on FLINK-5898:
-

The fix for this issue is included in release `0.12.4` of the AWS KPL, released 
2 days ago (May 17, 2017). Anyone affected by this issue can use version 
`0.12.4` of the KPL. Marking this issue as closed.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-15 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011272#comment-16011272
 ] 

Scott Kidder commented on FLINK-5898:
-

FYI: the pull-request against the Kinesis Producer Library (KPL) was accepted 
and merged into the master branch of the KPL project repo. I imagine it'll be 
present in the next release of the KPL.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-24 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15883238#comment-15883238
 ] 

Scott Kidder commented on FLINK-5898:
-

Thanks Gordon & Rob, I've opened a pull-request against the KPL:
https://github.com/awslabs/amazon-kinesis-producer/pull/92

Testing with a 4-core Flink cluster (2 task-managers with 2 cores each) looks 
good; the separate lock-file prevents the race-condition that can occur when 
reading/writing the native binary. I'll update this issue if the situation 
changes or the KPL pull-request is accepted.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-24 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882319#comment-15882319
 ] 

Robert Metzger commented on FLINK-5898:
---

Thank you Scott for looking into this!
Fixing it at the KPL is probably the easiest.

If that doesn't work, we could consider temporarily changing the 
"java.io.tmpdir" system property to include a random UUID.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881976#comment-15881976
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


That's great! Thanks a lot for the efforts and please keep us posted :-)

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881967#comment-15881967
 ] 

Scott Kidder commented on FLINK-5898:
-

Hi [~tzulitai],

I'll look into fixing this in the KPL. I noticed that the method that installs 
the KPL binary uses a shared lock, which would allow multiple processes to 
obtain overlapping locks and write to the same file simultaneously:
https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java#L815

I'll try patching the KPL to obtain an exclusive lock. I'll also file a Github 
issue against the KPL to see what the KPL authors think.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


Thanks for looking into the issue [~skidder].

This seems tricky. It isn't possible to share the {{KinesisProducer}} across 
the subtasks, and there's no means to coordinate multiple subtasks to 
synchronize this access either.

I'm not sure how we should deal with this one ...
It does however bring up the question again of whether or not we should use the 
low-level Java SDK instead of KPL for implementation of 
{{FlinkKinesisProducer}}.
[~rmetzger] what do you think?

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)