[jira] [Created] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes

2017-12-15 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8270:
--

 Summary: TaskManagers do not use correct local path for shipped 
Keytab files in Yarn deployment modes
 Key: FLINK-8270
 URL: https://issues.apache.org/jira/browse/FLINK-8270
 Project: Flink
  Issue Type: Bug
  Components: Security
Affects Versions: 1.4.0
Reporter: Tzu-Li (Gordon) Tai
Priority: Blocker
 Fix For: 1.5.0, 1.4.1


Reported on ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html

This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are 
again not using the correct local paths for shipped Keytab files.

The cause was accidental due to this change: 
https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e.

Things to consider:
1) The above accidental breaking change was actually targeting a minor refactor 
on the "integration test scenario" code block in {{YarnTaskManagerRunner}}. It 
would be best if we can remove that test case code block from the main code.
2) Unit test coverage is apparently not enough. As this incidence shows, any 
slight changes can cause this issue to easily resurface again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Dyana Rose (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293231#comment-16293231
 ] 

Dyana Rose commented on FLINK-8267:
---

a push towards the native keys in the docs would simplify things greatly and 
make it much easier for people who already use the KCL and KPL in other 
applications.

I'm not too proud to say I spent way too much time today navigating through the 
layers of config abstraction to try to understand how the KCL and KPL were 
attempting to share settings.



> Kinesis Producer example setting Region key
> ---
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7736) Fix some of the alerts raised by lgtm.com

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293108#comment-16293108
 ] 

ASF GitHub Bot commented on FLINK-7736:
---

Github user 1m2c3t4 commented on the issue:

https://github.com/apache/flink/pull/4784
  
Rebased again and fixed another obvious bug


> Fix some of the alerts raised by lgtm.com
> -
>
> Key: FLINK-7736
> URL: https://issues.apache.org/jira/browse/FLINK-7736
> Project: Flink
>  Issue Type: Improvement
>Reporter: Malcolm Taylor
>Assignee: Malcolm Taylor
>
> lgtm.com has identified a number of issues giving scope for improvement in 
> the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list]
> This issue is to address some of the simpler ones. Some of these are quite 
> clear bugs such as off-by-one errors. Others are areas where the code might 
> be made clearer, such as use of a variable name which shadows another 
> variable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4784: FLINK-7736: fix some lgtm.com alerts

2017-12-15 Thread 1m2c3t4
Github user 1m2c3t4 commented on the issue:

https://github.com/apache/flink/pull/4784
  
Rebased again and fixed another obvious bug


---


[jira] [Assigned] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-8267:
---

Assignee: Bowen Li

> Kinesis Producer example setting Region key
> ---
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8249) Kinesis Producer didnt configure region

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293087#comment-16293087
 ] 

ASF GitHub Bot commented on FLINK-8249:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5160
  
@eskabetxe Thanks for fixing this

A further effort is https://issues.apache.org/jira/browse/FLINK-8267   


> Kinesis Producer didnt configure region
> ---
>
> Key: FLINK-8249
> URL: https://issues.apache.org/jira/browse/FLINK-8249
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Joao Boto
> Fix For: 1.5.0, 1.4.1
>
>
> Hi,
> setting this configurations to FlinkKinesisProducer:
> {code}
> properties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
> properties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
> properties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
> {code}
> is throwing this error:
> {code}
> 17/12/13 10:50:11 ERROR LogInputStreamReader: [2017-12-13 10:50:11.290786] 
> [0x57ba][0x7f31cbce5780] [error] [main.cc:266] Could not configure 
> the region. It was not given in the config and we were unable to retrieve it 
> from EC2 metadata.
> 17/12/13 10:50:12 ERROR KinesisProducer: Error in child process
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.IrrecoverableError:
>  Child process exited with code 1
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:525)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:497)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:475)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:63)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:133)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 17/12/13 10:50:15 ERROR LogInputStreamReader: [2017-12-13 10:50:15.700441] 
> [0x57c4][0x7ffb152b5780] [error] [AWS Log: ERROR](CurlHttpClient)Curl 
> returned error code 28
> 17/12/13 10:50:15 ERROR LogInputStreamReader: [2017-12-13 10:50:15.700521] 
> [0x57c4][0x7ffb152b5780] [error] [AWS Log: 
> ERROR](EC2MetadataClient)Http request to Ec2MetadataService failed.
> {code}
> making some investigations the region is never setted and i think this is the 
> reason:
> in this commit: 
> https://github.com/apache/flink/commit/9ed5d9a180dcd871e33bf8982434e3afd90ed295#diff-f3c6c35f3b045df8408b310f8f8a6bc7
> {code}
> - KinesisProducerConfiguration producerConfig = new 
> KinesisProducerConfiguration();
> - 
> producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
> + // check and pass the configuration properties
> + KinesisProducerConfiguration producerConfig = 
> KinesisConfigUtil.validateProducerConfiguration(configProps);
>   
> producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
> {code}
> this line was removed
> {code}
> producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
> {code}
> cc [~tzulitai], [~phoenixjiangnan]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293017#comment-16293017
 ] 

Bowen Li edited comment on FLINK-8267 at 12/15/17 7:39 PM:
---

This is a great point. According to the previous discussion between [~tzulitai] 
and I, we want to remove AWSConfigConstants in 2.0 because it basically 
copies/translates config keys of KPL/KCL, which doesn't add much value. 

I believe we should update Flink examples to guide users to start using KPL/KCL 
native keys. [~tzulitai] What do you think?


was (Author: phoenixjiangnan):
This is a great point. According to the previous discussion between [~tzulitai] 
and I, we want to remove AWSConfigConstants in 2.0 because it basically 
copies/translates config keys of KPL/KCL, which doesn't add much value. 

I believe we should update Flink examples to guide users use KPL/KCL native 
keys. [~tzulitai] What do you think?

> Kinesis Producer example setting Region key
> ---
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5160: [FLINK-8249] [KinesisConnector] [hotfix] aws region is ne...

2017-12-15 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5160
  
@eskabetxe Thanks for fixing this

A further effort is https://issues.apache.org/jira/browse/FLINK-8267   


---


[GitHub] flink issue #5162: [FLINK-8250][runtime] Remove unused RecordSerializer#inst...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5162
  
Seems reasonable...


---


[jira] [Commented] (FLINK-8249) Kinesis Producer didnt configure region

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293073#comment-16293073
 ] 

ASF GitHub Bot commented on FLINK-8249:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5160
  
Minor comment: The commit should not have the [hotfix] tag


> Kinesis Producer didnt configure region
> ---
>
> Key: FLINK-8249
> URL: https://issues.apache.org/jira/browse/FLINK-8249
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Joao Boto
> Fix For: 1.5.0, 1.4.1
>
>
> Hi,
> setting this configurations to FlinkKinesisProducer:
> {code}
> properties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
> properties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
> properties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
> {code}
> is throwing this error:
> {code}
> 17/12/13 10:50:11 ERROR LogInputStreamReader: [2017-12-13 10:50:11.290786] 
> [0x57ba][0x7f31cbce5780] [error] [main.cc:266] Could not configure 
> the region. It was not given in the config and we were unable to retrieve it 
> from EC2 metadata.
> 17/12/13 10:50:12 ERROR KinesisProducer: Error in child process
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.IrrecoverableError:
>  Child process exited with code 1
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:525)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:497)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:475)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:63)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:133)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 17/12/13 10:50:15 ERROR LogInputStreamReader: [2017-12-13 10:50:15.700441] 
> [0x57c4][0x7ffb152b5780] [error] [AWS Log: ERROR](CurlHttpClient)Curl 
> returned error code 28
> 17/12/13 10:50:15 ERROR LogInputStreamReader: [2017-12-13 10:50:15.700521] 
> [0x57c4][0x7ffb152b5780] [error] [AWS Log: 
> ERROR](EC2MetadataClient)Http request to Ec2MetadataService failed.
> {code}
> making some investigations the region is never setted and i think this is the 
> reason:
> in this commit: 
> https://github.com/apache/flink/commit/9ed5d9a180dcd871e33bf8982434e3afd90ed295#diff-f3c6c35f3b045df8408b310f8f8a6bc7
> {code}
> - KinesisProducerConfiguration producerConfig = new 
> KinesisProducerConfiguration();
> - 
> producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
> + // check and pass the configuration properties
> + KinesisProducerConfiguration producerConfig = 
> KinesisConfigUtil.validateProducerConfiguration(configProps);
>   
> producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
> {code}
> this line was removed
> {code}
> producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
> {code}
> cc [~tzulitai], [~phoenixjiangnan]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5164: [hotfix][javadoc] fix typo in StreamExecutionEnvironment ...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5164
  
Good catch, +1 to merge


---


[jira] [Commented] (FLINK-8250) Remove RecordSerializer#instantiateMetrics

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293074#comment-16293074
 ] 

ASF GitHub Bot commented on FLINK-8250:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5162
  
Seems reasonable...


> Remove RecordSerializer#instantiateMetrics
> --
>
> Key: FLINK-8250
> URL: https://issues.apache.org/jira/browse/FLINK-8250
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Type Serialization System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> {{RecordSerializer#instantiateMetrics}} is unused with an empty 
> implementation, It wasn't cleanep up when we reworked how we measure IO 
> metrics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5160: [FLINK-8249] [KinesisConnector] [hotfix] aws region is ne...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5160
  
Minor comment: The commit should not have the [hotfix] tag


---


[jira] [Commented] (FLINK-8249) Kinesis Producer didnt configure region

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293072#comment-16293072
 ] 

ASF GitHub Bot commented on FLINK-8249:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5160#discussion_r157281403
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -191,6 +191,7 @@ public static KinesisProducerConfiguration 
getValidatedProducerConfiguration(Pro
}
 
KinesisProducerConfiguration kpc = 
KinesisProducerConfiguration.fromProperties(config);
+   
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
--- End diff --

Does this work if the region property is `null`?


> Kinesis Producer didnt configure region
> ---
>
> Key: FLINK-8249
> URL: https://issues.apache.org/jira/browse/FLINK-8249
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Joao Boto
> Fix For: 1.5.0, 1.4.1
>
>
> Hi,
> setting this configurations to FlinkKinesisProducer:
> {code}
> properties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
> properties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
> properties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
> {code}
> is throwing this error:
> {code}
> 17/12/13 10:50:11 ERROR LogInputStreamReader: [2017-12-13 10:50:11.290786] 
> [0x57ba][0x7f31cbce5780] [error] [main.cc:266] Could not configure 
> the region. It was not given in the config and we were unable to retrieve it 
> from EC2 metadata.
> 17/12/13 10:50:12 ERROR KinesisProducer: Error in child process
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.IrrecoverableError:
>  Child process exited with code 1
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:525)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:497)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:475)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:63)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:133)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 17/12/13 10:50:15 ERROR LogInputStreamReader: [2017-12-13 10:50:15.700441] 
> [0x57c4][0x7ffb152b5780] [error] [AWS Log: ERROR](CurlHttpClient)Curl 
> returned error code 28
> 17/12/13 10:50:15 ERROR LogInputStreamReader: [2017-12-13 10:50:15.700521] 
> [0x57c4][0x7ffb152b5780] [error] [AWS Log: 
> ERROR](EC2MetadataClient)Http request to Ec2MetadataService failed.
> {code}
> making some investigations the region is never setted and i think this is the 
> reason:
> in this commit: 
> https://github.com/apache/flink/commit/9ed5d9a180dcd871e33bf8982434e3afd90ed295#diff-f3c6c35f3b045df8408b310f8f8a6bc7
> {code}
> - KinesisProducerConfiguration producerConfig = new 
> KinesisProducerConfiguration();
> - 
> producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
> + // check and pass the configuration properties
> + KinesisProducerConfiguration producerConfig = 
> KinesisConfigUtil.validateProducerConfiguration(configProps);
>   
> producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
> {code}
> this line was removed
> {code}
> producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
> {code}
> cc [~tzulitai], [~phoenixjiangnan]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5160: [FLINK-8249] [KinesisConnector] [hotfix] aws regio...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5160#discussion_r157281403
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -191,6 +191,7 @@ public static KinesisProducerConfiguration 
getValidatedProducerConfiguration(Pro
}
 
KinesisProducerConfiguration kpc = 
KinesisProducerConfiguration.fromProperties(config);
+   
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
--- End diff --

Does this work if the region property is `null`?


---


[jira] [Commented] (FLINK-8079) Skip remaining E2E tests if one failed

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293069#comment-16293069
 ] 

ASF GitHub Bot commented on FLINK-8079:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5156
  
I like the change+1

Minor: Is there a way to write `if code != 0 return` in bash? Might be 
easier to read...


> Skip remaining E2E tests if one failed
> --
>
> Key: FLINK-8079
> URL: https://issues.apache.org/jira/browse/FLINK-8079
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0, 1.4.1
>
>
> I propose that if one end-to-end tests fails the remaining tests are skipped.
> [~aljoscha] What do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5156: [FLINK-8079][tests] Stop end-to-end test execution after ...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5156
  
I like the change+1

Minor: Is there a way to write `if code != 0 return` in bash? Might be 
easier to read...


---


[jira] [Commented] (FLINK-8227) Optimize the performance of SharedBufferSerializer

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293066#comment-16293066
 ] 

ASF GitHub Bot commented on FLINK-8227:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5142
  
@kl0u or @dawidwys I think this could use your opinion.

General questions:
  - Is there any reason to use a boxed integer over a primitive one? 
Primitives should be the default choice.
  - Is is correct that this the id is transient?


> Optimize the performance of SharedBufferSerializer
> --
>
> Key: FLINK-8227
> URL: https://issues.apache.org/jira/browse/FLINK-8227
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and 
> put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But 
> we obverse that in some cases the calculation of hashCode may become the 
> bottleneck. The performance will decrease as the number of 
> {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of 
> {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about 
> {{N * N}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5142
  
@kl0u or @dawidwys I think this could use your opinion.

General questions:
  - Is there any reason to use a boxed integer over a primitive one? 
Primitives should be the default choice.
  - Is is correct that this the id is transient?


---


[jira] [Commented] (FLINK-8217) Properly annotate APIs of flink-connector-kinesis

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293062#comment-16293062
 ] 

ASF GitHub Bot commented on FLINK-8217:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5138
  
I agree that we should be careful with `@Public` and if in doubt resort to 
`@PublicEvolving` (and use `@Internal` sort of as the default).

Given that @tzulitai did a good amount of the Kinesis work, his buy-in for 
`@Public` classes would be good.


> Properly annotate APIs of flink-connector-kinesis
> -
>
> Key: FLINK-8217
> URL: https://issues.apache.org/jira/browse/FLINK-8217
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5138: [FLINK-8217] [Kinesis connector] Properly annotate APIs o...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5138
  
I agree that we should be careful with `@Public` and if in doubt resort to 
`@PublicEvolving` (and use `@Internal` sort of as the default).

Given that @tzulitai did a good amount of the Kinesis work, his buy-in for 
`@Public` classes would be good.


---


[jira] [Commented] (FLINK-8223) Update Hadoop versions

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293057#comment-16293057
 ] 

ASF GitHub Bot commented on FLINK-8223:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5137
  
Looks good.

Related: Hadoop 3.0 just got released, wondering when we need to add 
support for that...


> Update Hadoop versions
> --
>
> Key: FLINK-8223
> URL: https://issues.apache.org/jira/browse/FLINK-8223
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
> http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5137: [FLINK-8223] [build] Update Hadoop versions

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5137
  
Looks good.

Related: Hadoop 3.0 just got released, wondering when we need to add 
support for that...


---


[jira] [Commented] (FLINK-8222) Update Scala version

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293056#comment-16293056
 ] 

ASF GitHub Bot commented on FLINK-8222:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5136
  
Should be good.

I have seen now and then warnings by the Scala compiler about Scala 
dependencies requiring different minor versions. It's only a warning, but I am 
wondering if there could be are compatibility issues?

I would go ahead and merge this for 1.5 soon to give it exposure.
Not sure if I would merge it into 1.4.1 because of the above. What do you 
think?


> Update Scala version
> 
>
> Key: FLINK-8222
> URL: https://issues.apache.org/jira/browse/FLINK-8222
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
> distribution but rather anyone who is compiling Flink or a 
> Flink-quickstart-derived program on a shared system.
> "A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
> the Scala compilation daemon."
> https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5136: [FLINK-8222] [build] Update Scala version

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5136
  
Should be good.

I have seen now and then warnings by the Scala compiler about Scala 
dependencies requiring different minor versions. It's only a warning, but I am 
wondering if there could be are compatibility issues?

I would go ahead and merge this for 1.5 soon to give it exposure.
Not sure if I would merge it into 1.4.1 because of the above. What do you 
think?


---


[GitHub] flink issue #5135: [hotfix] [doc] Fix typo in TaskManager and EnvironmentInf...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5135
  
Thanks @greghogan for the review, looks good to merge...


---


[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5133
  
I think this is good, except for the removal of 
`StandaloneHaServices#RESOURCE_MANAGER_RPC_ENDPOINT_NAME` where I am unsure if 
some of the pending FLIP-6 work has plans for that. I would suggest to keep 
that field.


---


[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293052#comment-16293052
 ] 

ASF GitHub Bot commented on FLINK-5506:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5126
  
Just a comment on the use of `TypeHint`:

In the `DataSet` and `DataStream` API, we typically pass `TypeInformation` 
when offering an explicit way to describe the type. The reason is that 
`TypeInformation` is the core class to describe types, and when you pick up the 
type from somewhere to pass it on, you get a `TypeInformation`.

The `TypeHint` class is used as a trick to create a `TypeInformation` by 
capturing the parameters through creating an anonymous subclass. In some way, 
one could always use `new TypeHint>(){}.getTypeInfo()`, 
but for convenience, some methods accept directly a `TypeHint`, mainly the 
`returns(...)` method.

If you take `TypeHint` directly as a parameter, it becomes hard to pass the 
type if you for example obtain it from another `DataSet` via the `getType()` 
method.


> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> {code}
> ​Running this code throws the following exception​ (check the bold line):
> {code}
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> 

[GitHub] flink issue #5126: [FLINK-5506] [gelly] Fix CommunityDetection NullPointerEx...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5126
  
Just a comment on the use of `TypeHint`:

In the `DataSet` and `DataStream` API, we typically pass `TypeInformation` 
when offering an explicit way to describe the type. The reason is that 
`TypeInformation` is the core class to describe types, and when you pick up the 
type from somewhere to pass it on, you get a `TypeInformation`.

The `TypeHint` class is used as a trick to create a `TypeInformation` by 
capturing the parameters through creating an anonymous subclass. In some way, 
one could always use `new TypeHint>(){}.getTypeInfo()`, 
but for convenience, some methods accept directly a `TypeHint`, mainly the 
`returns(...)` method.

If you take `TypeHint` directly as a parameter, it becomes hard to pass the 
type if you for example obtain it from another `DataSet` via the `getType()` 
method.


---


[jira] [Commented] (FLINK-8199) Annotation for Elasticsearch connector

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293046#comment-16293046
 ] 

ASF GitHub Bot commented on FLINK-8199:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5124
  
@tzulitai you are one of the main contributors here, I think your input 
would be valuable.

A result could also be that this is not yet ready to be divided it into 
"stable public" "evolving public" and "internal".


> Annotation for Elasticsearch connector
> --
>
> Key: FLINK-8199
> URL: https://issues.apache.org/jira/browse/FLINK-8199
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5124: [FLINK-8199] [elasticsearch connector] Properly annotate ...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5124
  
@tzulitai you are one of the main contributors here, I think your input 
would be valuable.

A result could also be that this is not yet ready to be divided it into 
"stable public" "evolving public" and "internal".


---


[jira] [Commented] (FLINK-8199) Annotation for Elasticsearch connector

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293045#comment-16293045
 ] 

ASF GitHub Bot commented on FLINK-8199:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5124
  
I think the intent here is good, but every time some class is annotated 
with `@Public`, that is a commitment to keep the API stable.

Before doing that, the community members that mainly authored or maintain 
that code should chime in and agree on which classes should actually be 
`@Public` and which ones `@PublicEvolving`.


> Annotation for Elasticsearch connector
> --
>
> Key: FLINK-8199
> URL: https://issues.apache.org/jira/browse/FLINK-8199
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5124: [FLINK-8199] [elasticsearch connector] Properly annotate ...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5124
  
I think the intent here is good, but every time some class is annotated 
with `@Public`, that is a commitment to keep the API stable.

Before doing that, the community members that mainly authored or maintain 
that code should chime in and agree on which classes should actually be 
`@Public` and which ones `@PublicEvolving`.


---


[GitHub] flink issue #5113: [FLINK-8156][build] Bump commons-beanutils version to 1.9...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5113
  
Good change, thank you!

Given that beanutils is not used directly by Flink, but only by Hadoop 
(which we try to depend on less and less), could you move the dependency 
management entry for that to the `flink-shaded-hadoop` parent project?


---


[jira] [Commented] (FLINK-8156) Bump commons-beanutils version to 1.9.3

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293040#comment-16293040
 ] 

ASF GitHub Bot commented on FLINK-8156:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5113
  
Good change, thank you!

Given that beanutils is not used directly by Flink, but only by Hadoop 
(which we try to depend on less and less), could you move the dependency 
management entry for that to the `flink-shaded-hadoop` parent project?


> Bump commons-beanutils version to 1.9.3
> ---
>
> Key: FLINK-8156
> URL: https://issues.apache.org/jira/browse/FLINK-8156
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Commons-beanutils v1.8.0 dependency is not security compliant. See 
> [CVE-2014-0114|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2014-0114]:
> {code:java}
> Apache Commons BeanUtils, as distributed in lib/commons-beanutils-1.8.0.jar 
> in Apache Struts 1.x through 1.3.10 and in other products requiring 
> commons-beanutils through 1.9.2, does not suppress the class property, which 
> allows remote attackers to "manipulate" the ClassLoader and execute arbitrary 
> code via the class parameter, as demonstrated by the passing of this 
> parameter to the getClass method of the ActionForm object in Struts 1.
> {code}
> Note that current version commons-beanutils 1.9.2 in turn has a CVE in its 
> dependency commons-collections (CVE-2015-6420, see BEANUTILS-488), which is 
> fixed in 1.9.3.
> We should upgrade {{commons-beanutils}} from 1.8.3 to 1.9.3 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5112: [FLINK-8175] [DataStream API java/scala] remove flink-str...

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5112
  
I would actually be in favor of dismantling the `flink-streaming-contrib` 
project.

We could have a dedicated package `experimental` in `flink-streaming-java` 
for such code. The package should have a `package-info.java` clearly stating 
that this code is not battle hardened code and may change in future versions. 
None of the classes should be `@Public`, might could be be `@PublicEvolving`.


---


[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293032#comment-16293032
 ] 

ASF GitHub Bot commented on FLINK-8175:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5112
  
I would actually be in favor of dismantling the `flink-streaming-contrib` 
project.

We could have a dedicated package `experimental` in `flink-streaming-java` 
for such code. The package should have a `package-info.java` clearly stating 
that this code is not battle hardened code and may change in future versions. 
None of the classes should be `@Public`, might could be be `@PublicEvolving`.


> remove flink-streaming-contrib and migrate its classes to 
> flink-streaming-java/scala
> 
>
> Key: FLINK-8175
> URL: https://issues.apache.org/jira/browse/FLINK-8175
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> I propose removing flink-streaming-contrib from flink-contrib, and migrating 
> its classes to flink-streaming-java/scala for the following reasons:
> - flink-streaming-contrib is so small that it only has 4 classes (3 java and 
> 1 scala), and they don't need a dedicated jar for Flink to distribute and 
> maintain it and for users to deal with the overhead of dependency management
> - the 4 classes in flink-streaming-contrib are logically more tied to 
> flink-streaming-java/scala, and thus can be easily migrated
> - flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> To take a step even forward, I would argue that even flink-contrib should be 
> removed and all its submodules should be migrated to other top-level modules 
> for the following reasons: 1) Apache Flink the whole project itself is a 
> result of contributions from many developers, there's no reason to highlight 
> some contributions in a dedicated module named 'contrib' 2) flink-contrib 
> inherently doesn't have a good hierarchy to hold submodules



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293025#comment-16293025
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5076
  
Is this merged already?

I think the idea is nice, but I would say let's merge this if we have 
commitment to follow up on this, meaning fixing the other modules.

We already have a partially done dependency convergence work in the pom 
files, so I would vote to add another partially done dependency analysis work 
only if there is a commitment for timely followup and conclusion of this work 
(for example before 1.5 release, feature freeze targeted end of January).

What do you think?


> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5076
  
Is this merged already?

I think the idea is nice, but I would say let's merge this if we have 
commitment to follow up on this, meaning fixing the other modules.

We already have a partially done dependency convergence work in the pom 
files, so I would vote to add another partially done dependency analysis work 
only if there is a commitment for timely followup and conclusion of this work 
(for example before 1.5 release, feature freeze targeted end of January).

What do you think?


---


[jira] [Commented] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293017#comment-16293017
 ] 

Bowen Li commented on FLINK-8267:
-

This is a great point. According to the previous discussion between [~tzulitai] 
and I, we want to remove AWSConfigConstants in 2.0 because it basically 
copies/translates config keys of KPL/KCL, which doesn't add much value. 

I believe we should update Flink examples to guide users use KPL/KCL native 
keys. [~tzulitai] What do you think?

> Kinesis Producer example setting Region key
> ---
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293018#comment-16293018
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r157271859
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
+   
+   
+   analyze
+   
+   
analyze-only
+   
+   
+   
+   
true
+   
true
+   
true
+   

--- End diff --

Can we keep the root dependencies? I find the current way actually 
convenient and avoiding pom clutter.

As Chesnay mentioned, the compile scope dependencies should be okay (used 
everywhere).

Could the test dependencies be separately skipped? 


> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r157271859
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
+   
+   
+   analyze
+   
+   
analyze-only
+   
+   
+   
+   
true
+   
true
+   
true
+   

--- End diff --

Can we keep the root dependencies? I find the current way actually 
convenient and avoiding pom clutter.

As Chesnay mentioned, the compile scope dependencies should be okay (used 
everywhere).

Could the test dependencies be separately skipped? 


---


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293010#comment-16293010
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5072
  
Sorry, actually, taking a step back: Can you check in which dependencies 
snappy is included in addition?
Especially big packaged dependencies like `flink-s3-fs-presto` and 
`flink-s3-fs-hadoop`? In case they contain snappy, we should manage the 
dependency to the same version there. If two different versions get pulled in, 
the update may not have an effect.


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5072
  
Sorry, actually, taking a step back: Can you check in which dependencies 
snappy is included in addition?
Especially big packaged dependencies like `flink-s3-fs-presto` and 
`flink-s3-fs-hadoop`? In case they contain snappy, we should manage the 
dependency to the same version there. If two different versions get pulled in, 
the update may not have an effect.


---


[jira] [Created] (FLINK-8269) Set netRuntime in JobExecutionResult

2017-12-15 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8269:
---

 Summary: Set netRuntime in JobExecutionResult
 Key: FLINK-8269
 URL: https://issues.apache.org/jira/browse/FLINK-8269
 Project: Flink
  Issue Type: Bug
  Components: Job-Submission
Affects Versions: 1.5.0
 Environment: 917fbcbee4599c1d198a4c63942fe1d2762aa64a
Reporter: Gary Yao
Priority: Blocker


In FLIP-6 mode, the {{JobMaster}} does not correctly set the field 
{{netRuntime}} on the {{JobExecutionResult}} when the job status transitions to 
{{_FINISHED_}}.

Find the code in question below:
{code}
case FINISHED:
try {
// TODO get correct job duration
// job done, let's get the accumulators
Map accumulatorResults = 
executionGraph.getAccumulators();
JobExecutionResult result = new JobExecutionResult(jobID, 0L, 
accumulatorResults);

executor.execute(() -> 
jobCompletionActions.jobFinished(result));
}
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8233) Retrieve ExecutionResult by REST polling

2017-12-15 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8233:
---

Assignee: Gary Yao

> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292632#comment-16292632
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5072
  
Merging this...


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5072
  
Merging this...


---


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292628#comment-16292628
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5072
  
Bumping the version should be fine. We only need to make sure that wherever 
this is pulled, we promote it to a direct dependency such that dependency 
convergence works.


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2017-12-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5072
  
Bumping the version should be fine. We only need to make sure that wherever 
this is pulled, we promote it to a direct dependency such that dependency 
convergence works.


---


[jira] [Created] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2017-12-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8268:
---

 Summary: Test instability for 'TwoPhaseCommitSinkFunctionTest'
 Key: FLINK-8268
 URL: https://issues.apache.org/jira/browse/FLINK-8268
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.0
Reporter: Stephan Ewen
Priority: Critical


The following exception / failure message occurs.
{code}
Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
FAILURE! - in 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
  Time elapsed: 0.068 sec  <<< ERROR!
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
at java.io.BufferedWriter.flush(BufferedWriter.java:254)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292615#comment-16292615
 ] 

ASF GitHub Bot commented on FLINK-8264:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5167


> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5167: [FLINK-8264] [core] Add 'scala.' to the 'parent-fi...

2017-12-15 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5167


---


[jira] [Closed] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8264.
---

> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8264.
-
Resolution: Fixed

Fixed in
  - 1.4.1 via 57de25d8950272fc6c6ac9ed400c79b0680f40e4
  - 1.5.0 via 28d9b20da6e0b79306ef3e406371da15392f

> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292596#comment-16292596
 ] 

ASF GitHub Bot commented on FLINK-8264:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5166


> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5166: [FLINK-8264] [core] Add 'scala.' to the 'parent-fi...

2017-12-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5166


---


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292515#comment-16292515
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157174669
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -188,10 +189,16 @@ public Buffer requestBuffer() throws IOException {
 
@Override
public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-   return requestBuffer(true);
+   BufferBuilder bufferBuilder = requestBufferBuilder(true);
+   return bufferBuilder != null ? bufferBuilder.build() : null;
--- End diff --

ditto, this path would require changing `BufferOrEvent` to 
`BufferBuilderOrEvent`.


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292508#comment-16292508
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156922694
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 ---
@@ -48,8 +47,7 @@
 
private static final IOManager IO_MANAGER = new IOManagerAsync();
 
-   private static final TestInfiniteBufferProvider writerBufferPool =
-   new TestInfiniteBufferProvider();
+   private static final TestPooledBufferProvider writerBufferPool = new 
TestPooledBufferProvider(Integer.MAX_VALUE);
--- End diff --

Good point. I have cleaned up/simplify this test a little bit more and 
dropped `TestPooledBufferProvider` altogether to avoid this issue.


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292509#comment-16292509
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156931533
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.AbstractEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects records or events on the 
List.
+ */
+public class RecordOrEventCollectingResultPartitionWriter implements 
ResultPartitionWriter {
--- End diff --

Maybe, I was considering it but I really didn't want to extend into 
infinity the scope of my "simple" refactor... Remember that all of those 
commits/changes are here not because I planned to fix all of those tests, but 
because they exploded in my face while doing some relatively simple change :(


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292517#comment-16292517
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157154019
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link 
Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across 
multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+   private final MemorySegment memorySegment;
+
+   private final BufferRecycler recycler;
+
+   private int position = 0;
+
+   private boolean built = false;
--- End diff --

We could, by what's the benefit of that? You would end up with ugly cryptic 
checkstates: `checkState(position >= 0)` and cryptic setting `position = -1` 
after building the buffer. You could wrap them into some helper functions like 
`checkState(!isBuilt())` and `markPositionBuilt()` but I think that it's uglier 
and harder to understand then the explicit check variable. 


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292516#comment-16292516
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157178704
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ---
@@ -118,7 +122,7 @@ public Buffer answer(InvocationOnMock invocation) 
throws Throwable {
};
 
BufferProvider bufferProvider = 
mock(BufferProvider.class);
-   
when(bufferProvider.requestBufferBlocking()).thenAnswer(request);
+   
when(bufferProvider.requestBufferBuilderBlocking()).thenAnswer(request);
--- End diff --

This would require custom `BufferProvider` that would implement `final 
CountDownLatch sync = new CountDownLatch(2);` logic (or expand 
`TestPooleBufferProvider`- this test relays on waiting until one more call to 
`BufferProvider` after exhausting the pool happens.


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292511#comment-16292511
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156929782
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.types.Record;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
--- End diff --

it's a specific user supplied list (`the` vs `a`), and "on" seems to me 
better as well:

http://learnersdictionary.com/qa/is-it-correct-to-say-on-the-list-or-in-the-list
https://www.englishforums.com/English/OnTheListOrInTheList/wzblc/post.htm




> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292512#comment-16292512
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157172734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -179,7 +179,8 @@ public void setBufferPoolOwner(BufferPoolOwner owner) {
@Override
public Buffer requestBuffer() throws IOException {
try {
-   return requestBuffer(false);
+   BufferBuilder bufferBuilder = 
requestBufferBuilder(false);
+   return bufferBuilder != null ? bufferBuilder.build() : 
null;
--- End diff --

Nope, size is always set manually in remaining calls.

Yes I know, but it took me almost a week to adapt tests so far for this 
`BufferBuilder` (and `requestBuffer` is mocked ~60 times...). Can we postpone 
finishing this refactor for a later time? 


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292518#comment-16292518
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157154679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link 
Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across 
multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+   private final MemorySegment memorySegment;
+
+   private final BufferRecycler recycler;
+
+   private int position = 0;
+
+   private boolean built = false;
+
+   public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
+   this.memorySegment = checkNotNull(memorySegment);
+   this.recycler = checkNotNull(recycler);
+   }
+
+   public void append(ByteBuffer source) {
+   checkState(!built);
+
+   int needed = source.remaining();
+   int available = limit() - position;
+   int toCopy = Math.min(needed, available);
+
+   memorySegment.put(position, source, toCopy);
+   position += toCopy;
+   }
--- End diff --

I will add the test, but I would prefer to leave the `checkState` since 
it's purpose is to find bugs and provide early warning about them. 


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292513#comment-16292513
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156928873
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
 ---
@@ -104,11 +104,10 @@ public void testDestroyWhileBlockingRequest() throws 
Exception {
 * @return Flag indicating whether the Thread is in a blocking buffer
 * request or not
 */
-   private boolean isInBlockingBufferRequest(StackTraceElement[] 
stackTrace) {
+   public static boolean isInBlockingBufferRequest(StackTraceElement[] 
stackTrace) {
if (stackTrace.length >= 3) {
return stackTrace[0].getMethodName().equals("wait") &&
-   
stackTrace[1].getMethodName().equals("requestBuffer") &&
-   
stackTrace[2].getMethodName().equals("requestBufferBlocking");
+   
stackTrace[1].getClassName().equals(LocalBufferPool.class.getName());
--- End diff --

I do not like this test in a first place, but I couldn't come up with a 
better solution. This change make it at least less implementation dependent. 
The same logic as in my "crusade" against mockito. If you put such specific 
condition as it was before (blocking on `requestBuffer`) in one more test, you 
have to manually fix it in one more place during refactoring/adding features 
etc, which drastically increases the cost of maintaining the project and just 
doesn't scale up with the project size :( 

On the other hand you can argue that if after a refactor, we add more 
waiting conditions (if `LocalBufferPool` can block in two different places 
depending on some internal condition), broader check like this is might also be 
the better choice/condition.


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292519#comment-16292519
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156931262
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.AbstractEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects records or events on the 
List.
--- End diff --

it's a specific user supplied list (`the` vs `a`), and "on" seems to me 
better as well:

http://learnersdictionary.com/qa/is-it-correct-to-say-on-the-list-or-in-the-list
https://www.englishforums.com/English/OnTheListOrInTheList/wzblc/post.htm




> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292510#comment-16292510
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156923552
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
 ---
@@ -40,37 +39,31 @@
 
private final BufferRecycler bufferRecycler;
 
-   private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
+   private final int poolSize;
 
-   public TestBufferFactory() {
-   this(BUFFER_SIZE, RECYCLER);
-   }
-
-   public TestBufferFactory(int bufferSize) {
-   this(bufferSize, RECYCLER);
-   }
+   private int numberOfCreatedBuffers = 0;
 
-   public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) 
{
+   public TestBufferFactory(int poolSize, int bufferSize, BufferRecycler 
bufferRecycler) {
checkArgument(bufferSize > 0);
+   this.poolSize = poolSize;
this.bufferSize = bufferSize;
this.bufferRecycler = checkNotNull(bufferRecycler);
}
 
-   public Buffer create() {
-   numberOfCreatedBuffers.incrementAndGet();
+   public synchronized Buffer create() {
+   if (numberOfCreatedBuffers >= poolSize) {
+   return null;
+   }
 
+   numberOfCreatedBuffers++;
return new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), 
bufferRecycler);
}
 
-   public Buffer createFrom(MemorySegment segment) {
-   return new Buffer(segment, bufferRecycler);
-   }
-
-   public int getNumberOfCreatedBuffers() {
-   return numberOfCreatedBuffers.get();
+   public synchronized int getNumberOfCreatedBuffers() {
+   return numberOfCreatedBuffers;
}
 
-   public int getBufferSize() {
+   public synchronized int getBufferSize() {
--- End diff --

I would leave it just for the sake of having all methods `synchronized` so 
that you don't have to think which one are and which one should be 
`synchronized` (when adding features or refactoring this class in the future)


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8178) Introduce not threadsafe write only BufferBuilder

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292514#comment-16292514
 ] 

ASF GitHub Bot commented on FLINK-8178:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156934340
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ---
@@ -169,33 +151,29 @@ public Buffer getCurrentBuffer() {
if (targetBuffer == null) {
return null;
}
-
-   targetBuffer.setSize(position);
-   return targetBuffer;
+   Buffer result = targetBuffer.build();
+   targetBuffer = null;
+   return result;
--- End diff --

It kind of depends in which direction we will go with further changes.


> Introduce not threadsafe write only BufferBuilder
> -
>
> Key: FLINK-8178
> URL: https://issues.apache.org/jira/browse/FLINK-8178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> While Buffer class is used in multithreaded context it requires 
> synchronisation. Now it is miss-leading/unclear and suggesting that 
> RecordSerializer should take into account synchronisation of the Buffer 
> that's holding. With NotThreadSafe BufferBuilder there would be clear 
> separation between single-threaded writing/creating a BufferBuilder and 
> multithreaded Buffer handling/retaining/recycling.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156931533
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.AbstractEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects records or events on the 
List.
+ */
+public class RecordOrEventCollectingResultPartitionWriter implements 
ResultPartitionWriter {
--- End diff --

Maybe, I was considering it but I really didn't want to extend into 
infinity the scope of my "simple" refactor... Remember that all of those 
commits/changes are here not because I planned to fix all of those tests, but 
because they exploded in my face while doing some relatively simple change :(


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157154019
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link 
Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across 
multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+   private final MemorySegment memorySegment;
+
+   private final BufferRecycler recycler;
+
+   private int position = 0;
+
+   private boolean built = false;
--- End diff --

We could, by what's the benefit of that? You would end up with ugly cryptic 
checkstates: `checkState(position >= 0)` and cryptic setting `position = -1` 
after building the buffer. You could wrap them into some helper functions like 
`checkState(!isBuilt())` and `markPositionBuilt()` but I think that it's uglier 
and harder to understand then the explicit check variable. 


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156929782
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.types.Record;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
--- End diff --

it's a specific user supplied list (`the` vs `a`), and "on" seems to me 
better as well:

http://learnersdictionary.com/qa/is-it-correct-to-say-on-the-list-or-in-the-list
https://www.englishforums.com/English/OnTheListOrInTheList/wzblc/post.htm




---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157178704
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ---
@@ -118,7 +122,7 @@ public Buffer answer(InvocationOnMock invocation) 
throws Throwable {
};
 
BufferProvider bufferProvider = 
mock(BufferProvider.class);
-   
when(bufferProvider.requestBufferBlocking()).thenAnswer(request);
+   
when(bufferProvider.requestBufferBuilderBlocking()).thenAnswer(request);
--- End diff --

This would require custom `BufferProvider` that would implement `final 
CountDownLatch sync = new CountDownLatch(2);` logic (or expand 
`TestPooleBufferProvider`- this test relays on waiting until one more call to 
`BufferProvider` after exhausting the pool happens.


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156928873
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
 ---
@@ -104,11 +104,10 @@ public void testDestroyWhileBlockingRequest() throws 
Exception {
 * @return Flag indicating whether the Thread is in a blocking buffer
 * request or not
 */
-   private boolean isInBlockingBufferRequest(StackTraceElement[] 
stackTrace) {
+   public static boolean isInBlockingBufferRequest(StackTraceElement[] 
stackTrace) {
if (stackTrace.length >= 3) {
return stackTrace[0].getMethodName().equals("wait") &&
-   
stackTrace[1].getMethodName().equals("requestBuffer") &&
-   
stackTrace[2].getMethodName().equals("requestBufferBlocking");
+   
stackTrace[1].getClassName().equals(LocalBufferPool.class.getName());
--- End diff --

I do not like this test in a first place, but I couldn't come up with a 
better solution. This change make it at least less implementation dependent. 
The same logic as in my "crusade" against mockito. If you put such specific 
condition as it was before (blocking on `requestBuffer`) in one more test, you 
have to manually fix it in one more place during refactoring/adding features 
etc, which drastically increases the cost of maintaining the project and just 
doesn't scale up with the project size :( 

On the other hand you can argue that if after a refactor, we add more 
waiting conditions (if `LocalBufferPool` can block in two different places 
depending on some internal condition), broader check like this is might also be 
the better choice/condition.


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156934340
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ---
@@ -169,33 +151,29 @@ public Buffer getCurrentBuffer() {
if (targetBuffer == null) {
return null;
}
-
-   targetBuffer.setSize(position);
-   return targetBuffer;
+   Buffer result = targetBuffer.build();
+   targetBuffer = null;
+   return result;
--- End diff --

It kind of depends in which direction we will go with further changes.


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156923552
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
 ---
@@ -40,37 +39,31 @@
 
private final BufferRecycler bufferRecycler;
 
-   private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
+   private final int poolSize;
 
-   public TestBufferFactory() {
-   this(BUFFER_SIZE, RECYCLER);
-   }
-
-   public TestBufferFactory(int bufferSize) {
-   this(bufferSize, RECYCLER);
-   }
+   private int numberOfCreatedBuffers = 0;
 
-   public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) 
{
+   public TestBufferFactory(int poolSize, int bufferSize, BufferRecycler 
bufferRecycler) {
checkArgument(bufferSize > 0);
+   this.poolSize = poolSize;
this.bufferSize = bufferSize;
this.bufferRecycler = checkNotNull(bufferRecycler);
}
 
-   public Buffer create() {
-   numberOfCreatedBuffers.incrementAndGet();
+   public synchronized Buffer create() {
+   if (numberOfCreatedBuffers >= poolSize) {
+   return null;
+   }
 
+   numberOfCreatedBuffers++;
return new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), 
bufferRecycler);
}
 
-   public Buffer createFrom(MemorySegment segment) {
-   return new Buffer(segment, bufferRecycler);
-   }
-
-   public int getNumberOfCreatedBuffers() {
-   return numberOfCreatedBuffers.get();
+   public synchronized int getNumberOfCreatedBuffers() {
+   return numberOfCreatedBuffers;
}
 
-   public int getBufferSize() {
+   public synchronized int getBufferSize() {
--- End diff --

I would leave it just for the sake of having all methods `synchronized` so 
that you don't have to think which one are and which one should be 
`synchronized` (when adding features or refactoring this class in the future)


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156931262
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.AbstractEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects records or events on the 
List.
--- End diff --

it's a specific user supplied list (`the` vs `a`), and "on" seems to me 
better as well:

http://learnersdictionary.com/qa/is-it-correct-to-say-on-the-list-or-in-the-list
https://www.englishforums.com/English/OnTheListOrInTheList/wzblc/post.htm




---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157172734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -179,7 +179,8 @@ public void setBufferPoolOwner(BufferPoolOwner owner) {
@Override
public Buffer requestBuffer() throws IOException {
try {
-   return requestBuffer(false);
+   BufferBuilder bufferBuilder = 
requestBufferBuilder(false);
+   return bufferBuilder != null ? bufferBuilder.build() : 
null;
--- End diff --

Nope, size is always set manually in remaining calls.

Yes I know, but it took me almost a week to adapt tests so far for this 
`BufferBuilder` (and `requestBuffer` is mocked ~60 times...). Can we postpone 
finishing this refactor for a later time? 


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157174669
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -188,10 +189,16 @@ public Buffer requestBuffer() throws IOException {
 
@Override
public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-   return requestBuffer(true);
+   BufferBuilder bufferBuilder = requestBufferBuilder(true);
+   return bufferBuilder != null ? bufferBuilder.build() : null;
--- End diff --

ditto, this path would require changing `BufferOrEvent` to 
`BufferBuilderOrEvent`.


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r157154679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link 
Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across 
multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+   private final MemorySegment memorySegment;
+
+   private final BufferRecycler recycler;
+
+   private int position = 0;
+
+   private boolean built = false;
+
+   public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
+   this.memorySegment = checkNotNull(memorySegment);
+   this.recycler = checkNotNull(recycler);
+   }
+
+   public void append(ByteBuffer source) {
+   checkState(!built);
+
+   int needed = source.remaining();
+   int available = limit() - position;
+   int toCopy = Math.min(needed, available);
+
+   memorySegment.put(position, source, toCopy);
+   position += toCopy;
+   }
--- End diff --

I will add the test, but I would prefer to leave the `checkState` since 
it's purpose is to find bugs and provide early warning about them. 


---


[GitHub] flink pull request #5105: [FLINK-8178][network] Introduce not threadsafe wri...

2017-12-15 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5105#discussion_r156922694
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 ---
@@ -48,8 +47,7 @@
 
private static final IOManager IO_MANAGER = new IOManagerAsync();
 
-   private static final TestInfiniteBufferProvider writerBufferPool =
-   new TestInfiniteBufferProvider();
+   private static final TestPooledBufferProvider writerBufferPool = new 
TestPooledBufferProvider(Integer.MAX_VALUE);
--- End diff --

Good point. I have cleaned up/simplify this test a little bit more and 
dropped `TestPooledBufferProvider` altogether to avoid this issue.


---


[jira] [Closed] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4

2017-12-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8248.
---
Resolution: Not A Problem

> RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
> ---
>
> Key: FLINK-8248
> URL: https://issues.apache.org/jira/browse/FLINK-8248
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
> Environment: linux: 3.10.0-514.el7.x86_64
> flink: 
> *  version: 1.4
> *  rocksdb backend state
> *  checkpoint interval 5s
> *  keyed cep
> language: Java8
>Reporter: jia liu
>
> Here is my exception log:
> {code:java}
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry: 
> SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', 
> detector='SlidingWindowAnomalyDetector', measure='count', field='activity', 
> dimension='Logoff', description='null', icons=null, 
> startTimestamp=146529720, endTimestamp=1465297203600, count=11.0, 
> anomalyScore=100, adHashCode=-1866791453, timeMap={146529720=11.0}, 
> user='LMR0049', logQuery=null, group='null'}, 146530079, 0), 
> [SharedBufferEdge(null, 199)], 1)
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at 
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
>   at 
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>   ... 13 more
> {code}
> Main job code:
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new 
> RocksDBStateBackend(getString("flink.backend-state-dir")));
> // .
> DataStream behaviorStream = anomalyStream
> .assignTimestampsAndWatermarks(new 
> AnomalyTimestampExtractor(Time.seconds(0)))
> .keyBy((KeySelector) value -> 
> value.entity)
> 
> .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
> Time.seconds(getLong("flink.window.slice-size"
> .apply(new BehaviorBuilderFunction())
> .filter(new WhitelistFilterFunction())
> // non-keyed stream will result in pattern operator 
> parallelism equal to 1.
>   

[jira] [Reopened] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4

2017-12-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-8248:
-

Reopen to change resolution


> RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
> ---
>
> Key: FLINK-8248
> URL: https://issues.apache.org/jira/browse/FLINK-8248
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
> Environment: linux: 3.10.0-514.el7.x86_64
> flink: 
> *  version: 1.4
> *  rocksdb backend state
> *  checkpoint interval 5s
> *  keyed cep
> language: Java8
>Reporter: jia liu
>
> Here is my exception log:
> {code:java}
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry: 
> SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', 
> detector='SlidingWindowAnomalyDetector', measure='count', field='activity', 
> dimension='Logoff', description='null', icons=null, 
> startTimestamp=146529720, endTimestamp=1465297203600, count=11.0, 
> anomalyScore=100, adHashCode=-1866791453, timeMap={146529720=11.0}, 
> user='LMR0049', logQuery=null, group='null'}, 146530079, 0), 
> [SharedBufferEdge(null, 199)], 1)
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at 
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
>   at 
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>   ... 13 more
> {code}
> Main job code:
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new 
> RocksDBStateBackend(getString("flink.backend-state-dir")));
> // .
> DataStream behaviorStream = anomalyStream
> .assignTimestampsAndWatermarks(new 
> AnomalyTimestampExtractor(Time.seconds(0)))
> .keyBy((KeySelector) value -> 
> value.entity)
> 
> .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
> Time.seconds(getLong("flink.window.slice-size"
> .apply(new BehaviorBuilderFunction())
> .filter(new WhitelistFilterFunction())
> // non-keyed stream will result in pattern operator 
> parallelism equal to 1.
>   

[jira] [Updated] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8267:

Component/s: Kinesis Connector

> Kinesis Producer example setting Region key
> ---
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292376#comment-16292376
 ] 

ASF GitHub Bot commented on FLINK-8264:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5166
  
I think this looks good!


> Add Scala to the parent-first loading patterns
> --
>
> Key: FLINK-8264
> URL: https://issues.apache.org/jira/browse/FLINK-8264
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> A confusing experience happens when users accidentally package the Scala 
> Library into their jar file. The reversed class loading duplicates Scala's 
> classes, leading to exceptions like the one below.
> By adding {{scala.}} to the default 'parent-first-patterns' we can improve 
> the user experience in such situations.
> Exception Stack Trace:
> {code}
> java.lang.ClassCastException: cannot assign instance of 
> org.peopleinmotion.TestFunction$$anonfun$1 to field 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
> scala.Function1 in instance of 
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5166: [FLINK-8264] [core] Add 'scala.' to the 'parent-first' cl...

2017-12-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5166
  
I think this looks good!


---


[jira] [Commented] (FLINK-8266) Add network memory to ResourceProfile

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292337#comment-16292337
 ] 

ASF GitHub Bot commented on FLINK-8266:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5170

[FLINK-8266] [runtime] add network memory to ResourceProfile for the input 
and output memory of a task


## What is the purpose of the change

This pull request adds a network memory field to ResourceProfile. So job 
master can set the network memory of a task according to the number of input 
channels and output sub partitions.

## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change can be verified by running ResourceProfileTest:

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicabled)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8266

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5170


commit 93822bfac6a0794b2b2047e046dcef93c5313185
Author: shuai.xus 
Date:   2017-12-15T10:43:27Z

[FLINK-8266] add network memroy to ResourceProfile for the input and output 
memory of a task




> Add network memory to ResourceProfile
> -
>
> Key: FLINK-8266
> URL: https://issues.apache.org/jira/browse/FLINK-8266
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> In task manager side, it should allocated the network buffer pool according 
> to the input channel and output sub partition number, but when allocating a 
> worker, the resource profile doesn't contain the information about these 
> memory. 
> So I suggest add a network memory filed to ResourceProfile and job master 
> should calculate it when scheduling a task and then resource manager can 
> allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5170: [FLINK-8266] [runtime] add network memory to Resou...

2017-12-15 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5170

[FLINK-8266] [runtime] add network memory to ResourceProfile for the input 
and output memory of a task


## What is the purpose of the change

This pull request adds a network memory field to ResourceProfile. So job 
master can set the network memory of a task according to the number of input 
channels and output sub partitions.

## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change can be verified by running ResourceProfileTest:

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicabled)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8266

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5170


commit 93822bfac6a0794b2b2047e046dcef93c5313185
Author: shuai.xus 
Date:   2017-12-15T10:43:27Z

[FLINK-8266] add network memroy to ResourceProfile for the input and output 
memory of a task




---


[jira] [Updated] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Dyana Rose (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dyana Rose updated FLINK-8267:
--
Description: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
In the example code for the kinesis producer the region key is set like:

{code:java}
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
{code}

However, the AWS Kinesis Producer Library requires that the region key be 
Region 
(https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
 so the setting at this point should be:

{code:java}
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put("Region", "us-east-1");
{code}

When you run the Kinesis Producer you can see the effect of not setting the 
Region key by a log line

{noformat}
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
Kinesis producer instance for region ''
{noformat}


The KPL also then assumes it's running on EC2 and attempts to determine it's 
own region, which fails.

{noformat}
(EC2MetadataClient)Http request to Ec2MetadataService failed.
[error] [main.cc:266] Could not configure the region. It was not given in the 
config and we were unable to retrieve it from EC2 metadata
{noformat}


At the least I'd say the documentation should mention the difference between 
these two keys and when you are required to also set the Region key.

On the other hand, is this even the intended behaviour of this connector? Was 
it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
the of the kinesis stream? The documentation for the example states 

{noformat}
The example demonstrates producing a single Kinesis stream in the AWS region 
“us-east-1”.
{noformat}


  was:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html#kinesis-producer
In the example code for the kinesis producer the region key is set like:

{code:java}
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
{code}

However, the AWS Kinesis Producer Library requires that the region key be 
Region 
(https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
 so the setting at this point should be:

{code:java}
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put("Region", "us-east-1");
{code}

When you run the Kinesis Producer you can see the effect of not setting the 
Region key by a log line

{noformat}
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
Kinesis producer instance for region ''
{noformat}


The KPL also then assumes it's running on EC2 and attempts to determine it's 
own region, which fails.

{noformat}
(EC2MetadataClient)Http request to Ec2MetadataService failed.
[error] [main.cc:266] Could not configure the region. It was not given in the 
config and we were unable to retrieve it from EC2 metadata
{noformat}


At the least I'd say the documentation should mention the difference between 
these two keys and when you are required to also set the Region key.

On the other hand, is this even the intended behaviour of this connector? Was 
it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
the of the kinesis stream? The documentation for the example states 

{noformat}
The example demonstrates producing a single Kinesis stream in the AWS region 
“us-east-1”.
{noformat}



> Kinesis Producer example setting Region key
> ---
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>Reporter: Dyana Rose
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL 

[jira] [Created] (FLINK-8267) Kinesis Producer example setting Region key

2017-12-15 Thread Dyana Rose (JIRA)
Dyana Rose created FLINK-8267:
-

 Summary: Kinesis Producer example setting Region key
 Key: FLINK-8267
 URL: https://issues.apache.org/jira/browse/FLINK-8267
 Project: Flink
  Issue Type: Bug
Reporter: Dyana Rose
Priority: Minor


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html#kinesis-producer
In the example code for the kinesis producer the region key is set like:

{code:java}
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
{code}

However, the AWS Kinesis Producer Library requires that the region key be 
Region 
(https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
 so the setting at this point should be:

{code:java}
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put("Region", "us-east-1");
{code}

When you run the Kinesis Producer you can see the effect of not setting the 
Region key by a log line

{noformat}
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
Kinesis producer instance for region ''
{noformat}


The KPL also then assumes it's running on EC2 and attempts to determine it's 
own region, which fails.

{noformat}
(EC2MetadataClient)Http request to Ec2MetadataService failed.
[error] [main.cc:266] Could not configure the region. It was not given in the 
config and we were unable to retrieve it from EC2 metadata
{noformat}


At the least I'd say the documentation should mention the difference between 
these two keys and when you are required to also set the Region key.

On the other hand, is this even the intended behaviour of this connector? Was 
it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
the of the kinesis stream? The documentation for the example states 

{noformat}
The example demonstrates producing a single Kinesis stream in the AWS region 
“us-east-1”.
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8266) Add network memory to ResourceProfile

2017-12-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8266:

Labels: flip-6  (was: )

> Add network memory to ResourceProfile
> -
>
> Key: FLINK-8266
> URL: https://issues.apache.org/jira/browse/FLINK-8266
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> In task manager side, it should allocated the network buffer pool according 
> to the input channel and output sub partition number, but when allocating a 
> worker, the resource profile doesn't contain the information about these 
> memory. 
> So I suggest add a network memory filed to ResourceProfile and job master 
> should calculate it when scheduling a task and then resource manager can 
> allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8266) Add network memory to ResourceProfile

2017-12-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8266:
---

 Summary: Add network memory to ResourceProfile
 Key: FLINK-8266
 URL: https://issues.apache.org/jira/browse/FLINK-8266
 Project: Flink
  Issue Type: Improvement
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


In task manager side, it should allocated the network buffer pool according to 
the input channel and output sub partition number, but when allocating a 
worker, the resource profile doesn't contain the information about these 
memory. 
So I suggest add a network memory filed to ResourceProfile and job master 
should calculate it when scheduling a task and then resource manager can 
allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292235#comment-16292235
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user taizilongxu commented on the issue:

https://github.com/apache/flink/pull/5050
  
Hi, is there any thing to edit or change? 


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5050: [FLINK-4822] Ensure that the Kafka 0.8 connector is compa...

2017-12-15 Thread taizilongxu
Github user taizilongxu commented on the issue:

https://github.com/apache/flink/pull/5050
  
Hi, is there any thing to edit or change? 


---