[jira] [Updated] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-08 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
--
Description: 
h3. Backgroud

Bloom filter is useful in many situation, for example:
 * 1. Approximate calculation: deduplication (eg: UV calculation)
 * 2. Performance optimization: eg, [runtime filter 
join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
   By using BF, we can greatly reduce the number of queries for state data 
in a stream join, and these filtered queries will eventually fail to find any 
results, which is a poor performance for rocksdb-based state due to traversing 
```sst``` on the disk. 

However, based on the current status provided by flink, it is hard to use the 
bloom filter for the following reasons:
 * 1. Serialization problem: Bloom filter status can be large (for example: 
100M), if implement it based on the RocksDB state, the state data will need to 
be serialized each time it is queried and updated, and the performance will be 
very poor.
 * 2. Data skewed: Data in different key group can be skewed, and the 
information of data skewed can not be accurately predicted before the program 
is running. Therefore, it is impossible to determine how much resources bloom 
filter should allocate. One way to do this is to allocate space needed for the 
most skewed case, but this can lead to very serious waste of resources.

h3. Requirement

Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
need to meet the following features:
 * 1. Support for changing Parallelism
 * 2. Only serialize when necessary: when performing checkpoint
 * 3. Can deal with data skew problem: users only need to specify a 
PartitionedBloomFilter with the desired input, fpp, system will allocate 
resource dynamic.
 * 4. Do not conflict with other state: user can use KeyedState and 
OperateState when using this bloom filter.
 * 5. Support relax ttl (ie: the data survival time at least greater than the 
specified time)

Design doc:  [design 
doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]

  was:
h3. Backgroud

Bloom filter is useful in many situation, for example:
 * 1. Approximate calculation: deduplication (eg: UV calculation)
 * 2. Performance optimization: eg, [runtime filter 
join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
   By using BF, we can greatly reduce the number of queries for state data 
in a stream join, and these filtered queries will eventually fail to find any 
results, which is a poor performance for rocksdb-based state due to traversing 
```sst``` on the disk. 

However, based on the current status provided by flink, it is hard to use the 
bloom filter for the following reasons:
 * 1. Serialization problem: Bloom filter status can be large (for example: 
100M), if implement it based on the RocksDB state, the state data will need to 
be serialized each time it is queried and updated, and the performance will be 
very poor.
 * 2. Data skewed: Data in different key group can be skewed, and the 
information of data skewed can not be accurately predicted before the program 
is running. Therefore, it is impossible to determine how much resources bloom 
filter should allocate. One way to do this is to allocate space needed for the 
most skewed case, but this can lead to very serious waste of resources.

h3. Requirement

Therefore, I introduce the LinkedBloomFilter for flink, which at least need to 
meet the following features:
 * 1. Support for changing Parallelism
 * 2. Only serialize when necessary: when performing checkpoint
 * 3. Can deal with data skew problem: users only need to specify a 
LinkedBloomFilterState with the desired input, fpp, system will allocate 
resource dynamic.
 * 4. Do not conflict with other state: user can use KeyedState and 
OperateState when using bloom filter state.
 * 5. Support relax ttl (ie: the data survival time at least greater than the 
specified time)

Design doc:  [design 
doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]


> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV 

[jira] [Updated] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-08 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
--
Summary: Introduce PartitionedBloomFilter for Approximate calculation and 
other situations of performance optimization  (was: Introduce LinkedBloomFilter 
for Approximate calculation and other situations of performance optimization)

> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the LinkedBloomFilter for flink, which at least need 
> to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> LinkedBloomFilterState with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using bloom filter state.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-08 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358051#comment-16358051
 ] 

Sihua Zhou commented on FLINK-8601:
---

[~fhueske] I have update the document, could you please have a loot at it?

> Introduce LinkedBloomFilter for Approximate calculation and other situations 
> of performance optimization
> 
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the LinkedBloomFilter for flink, which at least need 
> to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> LinkedBloomFilterState with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using bloom filter state.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8601) Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-08 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
--
Component/s: (was: Core)
 State Backends, Checkpointing

> Introduce LinkedBloomFilter for Approximate calculation and other situations 
> of performance optimization
> 
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the LinkedBloomFilter for flink, which at least need 
> to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> LinkedBloomFilterState with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using bloom filter state.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8601) Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-08 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
--
Description: 
h3. Backgroud

Bloom filter is useful in many situation, for example:
 * 1. Approximate calculation: deduplication (eg: UV calculation)
 * 2. Performance optimization: eg, [runtime filter 
join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
   By using BF, we can greatly reduce the number of queries for state data 
in a stream join, and these filtered queries will eventually fail to find any 
results, which is a poor performance for rocksdb-based state due to traversing 
```sst``` on the disk. 

However, based on the current status provided by flink, it is hard to use the 
bloom filter for the following reasons:
 * 1. Serialization problem: Bloom filter status can be large (for example: 
100M), if implement it based on the RocksDB state, the state data will need to 
be serialized each time it is queried and updated, and the performance will be 
very poor.
 * 2. Data skewed: Data in different key group can be skewed, and the 
information of data skewed can not be accurately predicted before the program 
is running. Therefore, it is impossible to determine how much resources bloom 
filter should allocate. One way to do this is to allocate space needed for the 
most skewed case, but this can lead to very serious waste of resources.

h3. Requirement

Therefore, I introduce the LinkedBloomFilter for flink, which at least need to 
meet the following features:
 * 1. Support for changing Parallelism
 * 2. Only serialize when necessary: when performing checkpoint
 * 3. Can deal with data skew problem: users only need to specify a 
LinkedBloomFilterState with the desired input, fpp, system will allocate 
resource dynamic.
 * 4. Do not conflict with other state: user can use KeyedState and 
OperateState when using bloom filter state.
 * 5. Support relax ttl (ie: the data survival time at least greater than the 
specified time)

Design doc:  [design 
doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]

  was:
h3. Backgroud

Bloom filter is useful in many situation, for example:
 * 1. Approximate calculation: deduplication (eg: UV calculation)
 * 2. Performance optimization: eg, [runtime filter 
join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
   By using BF, we can greatly reduce the number of queries for state data 
in a stream join, and these filtered queries will eventually fail to find any 
results, which is a poor performance for rocksdb-based state due to traversing 
```sst``` on the disk. 

However, based on the current status provided by flink, it is hard to use the 
bloom filter for the following reasons:
 * 1. Serialization problem: Bloom filter status can be large (for example: 
100M), if implement it based on the RocksDB state, the state data will need to 
be serialized each time it is queried and updated, and the performance will be 
very poor.
 * 2. Data skewed: Data in different key group can be skewed, and the 
information of data skewed can not be accurately predicted before the program 
is running. Therefore, it is impossible to determine how much resources bloom 
filter should allocate. One way to do this is to allocate space needed for the 
most skewed case, but this can lead to very serious waste of resources.

h3. Requirement

Therefore, I introduce the LinkedBloomFilter for flink, which at least need to 
meet the following features:
 * 1. Support for changing Parallelism
 * 2. Only serialize when necessary: when performing checkpoint
 * 3. Can deal with data skew problem: users only need to specify a 
LinkedBloomFilterState with the desired input, fpp, system will allocate 
resource dynamic.
 * 4. Do not conflict with other state: user can use KeyedState and 
OperateState when using bloom filter state.
 * 5. Support relax ttl (ie: the data survival time at least greater than the 
specified time)

Design doc:  [design 
doc|https://docs.google.com/document/d/1yMCT2ogE0CtSjzRvldgi0ZPPxC791PpkVGkVeqaUUI8/edit?usp=sharing]


> Introduce LinkedBloomFilter for Approximate calculation and other situations 
> of performance optimization
> 
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance 

[jira] [Closed] (FLINK-8616) Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException

2018-02-08 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8616.
---
Resolution: Duplicate

Duplicate of FLINK-8423.

> Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator 
> masks ClassCastException
> -
>
> Key: FLINK-8616
> URL: https://issues.apache.org/jira/browse/FLINK-8616
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Cliff Resnick
>Priority: Major
>
> There is an attempt to enrich the exception with outputTag#getId, but 
> outputTag is null, and a NullPointerException is thrown. Looking at the 
> attempted message enrichment the code seems to assume a ClassCastException 
> can only stem from SideOutput type mismatches. This may have been the norm 
> before, but changes to classloader delegation in 1.4 have given rise to 
> multiple ClassLoader conflicts (at least for us), and they all seem to end up 
> here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-08 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8423:

Priority: Critical  (was: Minor)

> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-08 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai any update?


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357972#comment-16357972
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai any update?


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn

2018-02-08 Thread Ken Krugler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ken Krugler closed FLINK-8615.
--
Resolution: Not A Bug

> Configuring Apache Flink Local Set up with Pseudo distributed Yarn
> --
>
> Key: FLINK-8615
> URL: https://issues.apache.org/jira/browse/FLINK-8615
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0
>Reporter: Karrtik Iyer
>Priority: Major
>
> I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed 
> mode, following this blog: [Hadoop In Pseudo Distributed 
> Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html]
>  I have been able to successfully start hdfs and yarn. And also able to 
> submit Map reduce jobs.
> After that I have download Apache Flink 1.4 from 
> [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz].
>  Now I am trying to set up Flink on the above Yarn cluster by following the 
> steps 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html].
>  When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting 
> below error which I am unable to resolve. Can someone please advise and help 
> me with the same? 
>  
> {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster 
> java.lang.RuntimeException: Couldn't deploy Yarn session cluster at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>  at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
>  Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
> number of virtual cores per node were configured with 1 but Yarn only has -1 
> virtual cores available. Please note that the number of virtual cores is set 
> to the number of task slots by default unless configured in the Flink config 
> with 'yarn.containers.vcores.' at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn

2018-02-08 Thread Ken Krugler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357916#comment-16357916
 ] 

Ken Krugler commented on FLINK-8615:


https://flink.apache.org/community.html

> Configuring Apache Flink Local Set up with Pseudo distributed Yarn
> --
>
> Key: FLINK-8615
> URL: https://issues.apache.org/jira/browse/FLINK-8615
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0
>Reporter: Karrtik Iyer
>Priority: Major
>
> I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed 
> mode, following this blog: [Hadoop In Pseudo Distributed 
> Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html]
>  I have been able to successfully start hdfs and yarn. And also able to 
> submit Map reduce jobs.
> After that I have download Apache Flink 1.4 from 
> [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz].
>  Now I am trying to set up Flink on the above Yarn cluster by following the 
> steps 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html].
>  When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting 
> below error which I am unable to resolve. Can someone please advise and help 
> me with the same? 
>  
> {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster 
> java.lang.RuntimeException: Couldn't deploy Yarn session cluster at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>  at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
>  Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
> number of virtual cores per node were configured with 1 but Yarn only has -1 
> virtual cores available. Please note that the number of virtual cores is set 
> to the number of task slots by default unless configured in the Flink config 
> with 'yarn.containers.vcores.' at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn

2018-02-08 Thread Karrtik Iyer (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357903#comment-16357903
 ] 

Karrtik Iyer commented on FLINK-8615:
-

Hi [~kkrugler], Apologies, I was not aware of the Flink User list, can you 
please share the link of that forum where in I can post my question, and then 
close this issue?

> Configuring Apache Flink Local Set up with Pseudo distributed Yarn
> --
>
> Key: FLINK-8615
> URL: https://issues.apache.org/jira/browse/FLINK-8615
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0
>Reporter: Karrtik Iyer
>Priority: Major
>
> I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed 
> mode, following this blog: [Hadoop In Pseudo Distributed 
> Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html]
>  I have been able to successfully start hdfs and yarn. And also able to 
> submit Map reduce jobs.
> After that I have download Apache Flink 1.4 from 
> [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz].
>  Now I am trying to set up Flink on the above Yarn cluster by following the 
> steps 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html].
>  When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting 
> below error which I am unable to resolve. Can someone please advise and help 
> me with the same? 
>  
> {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster 
> java.lang.RuntimeException: Couldn't deploy Yarn session cluster at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>  at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
>  Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
> number of virtual cores per node were configured with 1 but Yarn only has -1 
> virtual cores available. Please note that the number of virtual cores is set 
> to the number of task slots by default unless configured in the Flink config 
> with 'yarn.containers.vcores.' at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357843#comment-16357843
 ] 

ASF GitHub Bot commented on FLINK-8617:
---

GitHub user Xpray opened a pull request:

https://github.com/apache/flink/pull/5438

[FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing …


## What is the purpose of the change
fix code generation error in MapGet


## Brief change log
handle nullTerm of MapGet GeneratedExpression correctly.


## Verifying this change

*(Please pick either of the following options)*

This change added tests and can be verified as follows:

```
run org.apache.flink.table.runtime.batch.sql.CalcITCase testMapGet()
run org.apache.flink.table.runtime.batch.table.CalcITCase testMapget()
run org.apache.flink.table.runtime.stream.table.CalcITCase testMapGet()
run org.apache.flink.table.runtime.stream.sql.SqlITCase testMapGet()
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): 
 no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: 
 no
  - The serializers: 
 no
  - The runtime per-record code paths (performance sensitive): 
 no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:
 no
  - The S3 file system connector:  
 no

## Documentation

  - Does this pull request introduce a new feature? 
 no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xpray/flink FLINK-8617

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5438.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5438


commit d618aac64156a9fd86c8367d6594e593521a8d8b
Author: Xpray 
Date:   2018-02-09T02:17:16Z

[FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing Map 
type




> Fix code generation bug while accessing Map type
> 
>
> Key: FLINK-8617
> URL: https://issues.apache.org/jira/browse/FLINK-8617
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
> And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
> and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5438: [FLINK-8617][TableAPI & SQL] Fix code generation b...

2018-02-08 Thread Xpray
GitHub user Xpray opened a pull request:

https://github.com/apache/flink/pull/5438

[FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing …


## What is the purpose of the change
fix code generation error in MapGet


## Brief change log
handle nullTerm of MapGet GeneratedExpression correctly.


## Verifying this change

*(Please pick either of the following options)*

This change added tests and can be verified as follows:

```
run org.apache.flink.table.runtime.batch.sql.CalcITCase testMapGet()
run org.apache.flink.table.runtime.batch.table.CalcITCase testMapget()
run org.apache.flink.table.runtime.stream.table.CalcITCase testMapGet()
run org.apache.flink.table.runtime.stream.sql.SqlITCase testMapGet()
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): 
 no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: 
 no
  - The serializers: 
 no
  - The runtime per-record code paths (performance sensitive): 
 no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:
 no
  - The S3 file system connector:  
 no

## Documentation

  - Does this pull request introduce a new feature? 
 no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xpray/flink FLINK-8617

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5438.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5438


commit d618aac64156a9fd86c8367d6594e593521a8d8b
Author: Xpray 
Date:   2018-02-09T02:17:16Z

[FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing Map 
type




---


[jira] [Created] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-08 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8617:
-

 Summary: Fix code generation bug while accessing Map type
 Key: FLINK-8617
 URL: https://issues.apache.org/jira/browse/FLINK-8617
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cliff Resnick updated FLINK-8616:
-
Summary: Missing null check in 
OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException  
(was: Missing null check in OperatorChain.copyingChainOutput#pushToOperator 
masks ClassCastException)

> Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator 
> masks ClassCastException
> -
>
> Key: FLINK-8616
> URL: https://issues.apache.org/jira/browse/FLINK-8616
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Cliff Resnick
>Priority: Major
>
> There is an attempt to enrich the exception with outputTag#getId, but 
> outputTag is null, and a NullPointerException is thrown. Looking at the 
> attempted message enrichment the code seems to assume a ClassCastException 
> can only stem from SideOutput type mismatches. This may have been the norm 
> before, but changes to classloader delegation in 1.4 have given rise to 
> multiple ClassLoader conflicts (at least for us), and they all seem to end up 
> here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cliff Resnick updated FLINK-8616:
-
Summary: Missing null check in 
OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException  (was: 
Missing null check in OperatorChain#pushToOperator masks ClassCastException)

> Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks 
> ClassCastException
> --
>
> Key: FLINK-8616
> URL: https://issues.apache.org/jira/browse/FLINK-8616
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Cliff Resnick
>Priority: Major
>
> There is an attempt to enrich the exception with outputTag#getId, but 
> outputTag is null, and a NullPointerException is thrown. Looking at the 
> attempted message enrichment the code seems to assume a ClassCastException 
> can only stem from SideOutput type mismatches. This may have been the norm 
> before, but changes to classloader delegation in 1.4 have given rise to 
> multiple ClassLoader conflicts (at least for us), and they all seem to end up 
> here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8616) Missing null check in OperatorChain#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-8616:


 Summary: Missing null check in OperatorChain#pushToOperator masks 
ClassCastException
 Key: FLINK-8616
 URL: https://issues.apache.org/jira/browse/FLINK-8616
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Cliff Resnick


There is an attempt to enrich the exception with outputTag#getId, but outputTag 
is null, and a NullPointerException is thrown. Looking at the attempted message 
enrichment the code seems to assume a ClassCastException can only stem from 
SideOutput type mismatches. This may have been the norm before, but changes to 
classloader delegation in 1.4 have given rise to multiple ClassLoader conflicts 
(at least for us), and they all seem to end up here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-02-08 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357640#comment-16357640
 ] 

Thomas Weise commented on FLINK-5697:
-

Since shards are immutable wrt their hash key range and records cannot move 
between shards, we should be able to use the parent shard IDs and the last read 
sequence to find when a newly discovered shard can be read from. Child shards 
don't need to be assigned to the same subtask, in which case we would need a 
way to know the last read offset from the parent shard from a different subtask 
for comparison with EndingSequenceNumber. Is it possible to retrieve the last 
checkpointed offsets from other subtasks outside of restore to perform such 
check? (It would still imply that consumption from a new child shard cannot 
start until the parent was checkpointed and therefore add latency, but would 
provide the ordering guarantee we are looking for?)

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357562#comment-16357562
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user mdaxini commented on a diff in the pull request:

https://github.com/apache/flink/pull/5424#discussion_r167064838
  
--- Diff: docs/dev/api_concepts.md ---
@@ -896,3 +896,54 @@ result type ```R``` for the final result. E.g. for a 
histogram, ```V``` is a num
 
 {% top %}
 
+## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed 
stream
--- End diff --

+1


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5424: FLINK-8571] [DataStream] Introduce utility functio...

2018-02-08 Thread mdaxini
Github user mdaxini commented on a diff in the pull request:

https://github.com/apache/flink/pull/5424#discussion_r167064838
  
--- Diff: docs/dev/api_concepts.md ---
@@ -896,3 +896,54 @@ result type ```R``` for the final result. E.g. for a 
histogram, ```V``` is a num
 
 {% top %}
 
+## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed 
stream
--- End diff --

+1


---


[jira] [Commented] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn

2018-02-08 Thread Ken Krugler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357418#comment-16357418
 ] 

Ken Krugler commented on FLINK-8615:


Hi Karrtik - for a support/configuration question like this, please post to the 
Flink user list first, versus opening a bug in Jira. It would be great if you 
could close this issue, thanks.

> Configuring Apache Flink Local Set up with Pseudo distributed Yarn
> --
>
> Key: FLINK-8615
> URL: https://issues.apache.org/jira/browse/FLINK-8615
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0
>Reporter: Karrtik Iyer
>Priority: Major
>
> I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed 
> mode, following this blog: [Hadoop In Pseudo Distributed 
> Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html]
>  I have been able to successfully start hdfs and yarn. And also able to 
> submit Map reduce jobs.
> After that I have download Apache Flink 1.4 from 
> [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz].
>  Now I am trying to set up Flink on the above Yarn cluster by following the 
> steps 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html].
>  When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting 
> below error which I am unable to resolve. Can someone please advise and help 
> me with the same? 
>  
> {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster 
> java.lang.RuntimeException: Couldn't deploy Yarn session cluster at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>  at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
>  Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
> number of virtual cores per node were configured with 1 but Yarn only has -1 
> virtual cores available. Please note that the number of virtual cores is set 
> to the number of task slots by default unless configured in the Flink config 
> with 'yarn.containers.vcores.' at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8410) Kafka consumer's commitedOffsets gauge metric is prematurely set

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8410:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Kafka consumer's commitedOffsets gauge metric is prematurely set
> 
>
> Key: FLINK-8410
> URL: https://issues.apache.org/jira/browse/FLINK-8410
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Metrics
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.3.3, 1.5.0, 1.4.2
>
>
> The {{committedOffset}} metric gauge value is set too early. It is set here:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L236
> This sets the committed offset before the actual commit happens, which varies 
> depending on whether the commit mode is auto periodically, or committed on 
> checkpoints. Moreover, in the 0.9+ consumers, the {{KafkaConsumerThread}} may 
> choose to supersede some commit attempts if the commit takes longer than the 
> commit interval.
> While the committed offset back to Kafka is not a critical value used by the 
> consumer, it will be best to have more accurate values as a Flink-shipped 
> metric.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8416) Kinesis consumer doc examples should demonstrate preferred default credentials provider

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8416:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Kinesis consumer doc examples should demonstrate preferred default 
> credentials provider
> ---
>
> Key: FLINK-8416
> URL: https://issues.apache.org/jira/browse/FLINK-8416
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.3.3, 1.5.0, 1.4.2
>
>
> The Kinesis consumer docs 
> [here](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html#kinesis-consumer)
>  demonstrate providing credentials by explicitly supplying the AWS Access ID 
> and Key.
> The always preferred approach for AWS, unless running locally, is to 
> automatically fetch the shipped credentials from the AWS environment.
> That is actually the default behaviour of the Kinesis consumer, so the docs 
> should demonstrate that more clearly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8247) Support Hadoop-free variant of Flink on Mesos

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357305#comment-16357305
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8247:


Moving this to 1.4.2, since on the mailing lists the community agreed to move 
forward with what we have already for 1.4.1.
Please reopen and let me know if you disagree.

> Support Hadoop-free variant of Flink on Mesos
> -
>
> Key: FLINK-8247
> URL: https://issues.apache.org/jira/browse/FLINK-8247
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
> Fix For: 1.5.0, 1.4.2
>
>
> In Hadoop-free mode, Hadoop isn't on the classpath.  The Mesos job manager 
> normally uses the Hadoop UserGroupInformation class to overlay a user context 
> (`HADOOP_USER_NAME`) for the task managers.  
> Detect the absence of Hadoop and skip over the `HadoopUserOverlay`, similar 
> to the logic in `HadoopModuleFactory`.This may require the introduction 
> of an overlay factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8247) Support Hadoop-free variant of Flink on Mesos

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8247:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Support Hadoop-free variant of Flink on Mesos
> -
>
> Key: FLINK-8247
> URL: https://issues.apache.org/jira/browse/FLINK-8247
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
> Fix For: 1.5.0, 1.4.2
>
>
> In Hadoop-free mode, Hadoop isn't on the classpath.  The Mesos job manager 
> normally uses the Hadoop UserGroupInformation class to overlay a user context 
> (`HADOOP_USER_NAME`) for the task managers.  
> Detect the absence of Hadoop and skip over the `HadoopUserOverlay`, similar 
> to the logic in `HadoopModuleFactory`.This may require the introduction 
> of an overlay factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8418) Kafka08ITCase.testStartFromLatestOffsets() times out on Travis

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8418:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Kafka08ITCase.testStartFromLatestOffsets() times out on Travis
> --
>
> Key: FLINK-8418
> URL: https://issues.apache.org/jira/browse/FLINK-8418
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> Instance: https://travis-ci.org/kl0u/flink/builds/327733085



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8451) CaseClassSerializer is not backwards compatible in 1.4

2018-02-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357300#comment-16357300
 ] 

Timo Walther commented on FLINK-8451:
-

[~tzulitai] yes, 1.4.2 is fine for me.

> CaseClassSerializer is not backwards compatible in 1.4
> --
>
> Key: FLINK-8451
> URL: https://issues.apache.org/jira/browse/FLINK-8451
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> There seems to be problems with the updated Scala version and the 
> CaseClassSerializer that make it impossible to restore from a Flink 1.3 
> savepoint.
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCACk7FThV5itjSj_1fG9oaWS86z8WTKWs7abHvok6FnHzq9XT-A%40mail.gmail.com%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3C7CABB00B-D52F-4878-B04F-201415CEB658%40mediamath.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357299#comment-16357299
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8308:


[~uce] what is the status of this?
Has this been merged for 1.4.1 yet?

> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8020) Deadlock found in Async I/O operator

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8020:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Deadlock found in Async I/O operator
> 
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Critical
> Fix For: 1.5.0, 1.4.2
>
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8020) Deadlock found in Async I/O operator

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357296#comment-16357296
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8020:


Moving this to 1.4.2, since on the mailing lists the community agreed to move 
forward with what we have already for 1.4.1.
Please reopen and let me know if you disagree.

> Deadlock found in Async I/O operator
> 
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Critical
> Fix For: 1.5.0, 1.4.2
>
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8500:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357295#comment-16357295
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


Moving this to 1.4.2, since on the mailing lists the community agreed to move 
forward with what we have already for 1.4.1.
Please reopen and let me know if you disagree.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357294#comment-16357294
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8487:


[~aljoscha] can you confirm?

> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8451) CaseClassSerializer is not backwards compatible in 1.4

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357291#comment-16357291
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8451:


[~shashank734] the stack trace you posted seems to direct to FLINK-7760.


As per-discussion in the mailing lists, we will not continue to block 1.4.1 for 
issues that are missing a pull request.
Moving this to 1.4.2. [~twalthr] let me know if you disagree, thanks.

> CaseClassSerializer is not backwards compatible in 1.4
> --
>
> Key: FLINK-8451
> URL: https://issues.apache.org/jira/browse/FLINK-8451
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> There seems to be problems with the updated Scala version and the 
> CaseClassSerializer that make it impossible to restore from a Flink 1.3 
> savepoint.
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCACk7FThV5itjSj_1fG9oaWS86z8WTKWs7abHvok6FnHzq9XT-A%40mail.gmail.com%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3C7CABB00B-D52F-4878-B04F-201415CEB658%40mediamath.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8451) CaseClassSerializer is not backwards compatible in 1.4

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8451:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> CaseClassSerializer is not backwards compatible in 1.4
> --
>
> Key: FLINK-8451
> URL: https://issues.apache.org/jira/browse/FLINK-8451
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> There seems to be problems with the updated Scala version and the 
> CaseClassSerializer that make it impossible to restore from a Flink 1.3 
> savepoint.
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCACk7FThV5itjSj_1fG9oaWS86z8WTKWs7abHvok6FnHzq9XT-A%40mail.gmail.com%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3C7CABB00B-D52F-4878-B04F-201415CEB658%40mediamath.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn

2018-02-08 Thread Karrtik Iyer (JIRA)
Karrtik Iyer created FLINK-8615:
---

 Summary: Configuring Apache Flink Local Set up with Pseudo 
distributed Yarn
 Key: FLINK-8615
 URL: https://issues.apache.org/jira/browse/FLINK-8615
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
 Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0
Reporter: Karrtik Iyer


I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed 
mode, following this blog: [Hadoop In Pseudo Distributed 
Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html]
 I have been able to successfully start hdfs and yarn. And also able to submit 
Map reduce jobs.

After that I have download Apache Flink 1.4 from 
[here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz].
 Now I am trying to set up Flink on the above Yarn cluster by following the 
steps 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html].
 When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting 
below error which I am unable to resolve. Can someone please advise and help me 
with the same? 

 

{{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster 
java.lang.RuntimeException: Couldn't deploy Yarn session cluster at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372)
 at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) 
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
 at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
 at java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
 Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
number of virtual cores per node were configured with 1 but Yarn only has -1 
virtual cores available. Please note that the number of virtual cores is set to 
the number of task slots by default unless configured in the Flink config with 
'yarn.containers.vcores.' at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265)
 at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415)
 at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8439:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
> 
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dyana Rose
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> This came up when using the s3 for the file system backend and running under 
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance 
> level credentials when accessing S3. However when running under ECS, you will 
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws 
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there 
> is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>  
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider: 
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very 
> confusing error stating that the ContainerCredentialsProvider doesn't 
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and 
> the name space gotcha) to the [AWS deployment 
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
>  would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-8270.

Resolution: Fixed

> TaskManagers do not use correct local path for shipped Keytab files in Yarn 
> deployment modes
> 
>
> Key: FLINK-8270
> URL: https://issues.apache.org/jira/browse/FLINK-8270
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Reported on ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html
> This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are 
> again not using the correct local paths for shipped Keytab files.
> The cause was accidental due to this change: 
> https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e.
> Things to consider:
> 1) The above accidental breaking change was actually targeting a minor 
> refactor on the "integration test scenario" code block in 
> {{YarnTaskManagerRunner}}. It would be best if we can remove that test case 
> code block from the main code.
> 2) Unit test coverage is apparently not enough. As this incidence shows, any 
> slight changes can cause this issue to easily resurface again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357288#comment-16357288
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8270:


Fixed by FLINK-8275.

Closing this issue now.

> TaskManagers do not use correct local path for shipped Keytab files in Yarn 
> deployment modes
> 
>
> Key: FLINK-8270
> URL: https://issues.apache.org/jira/browse/FLINK-8270
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Reported on ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html
> This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are 
> again not using the correct local paths for shipped Keytab files.
> The cause was accidental due to this change: 
> https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e.
> Things to consider:
> 1) The above accidental breaking change was actually targeting a minor 
> refactor on the "integration test scenario" code block in 
> {{YarnTaskManagerRunner}}. It would be best if we can remove that test case 
> code block from the main code.
> 2) Unit test coverage is apparently not enough. As this incidence shows, any 
> slight changes can cause this issue to easily resurface again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357284#comment-16357284
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-7756:


Marking this as resolved due to the fix in FLINK-7760.
[~shashank734] if you disagree and still have issues with this, please let us 
know and reopen the ticket.

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> 

[jira] [Resolved] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-7756.

Resolution: Fixed
  Assignee: Aljoscha Krettek

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> 

[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5372:
---
Fix Version/s: (was: 1.4.1)
   1.4.2

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:40 PM:


[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?
Since AFAICT this is a test instability, I will change the fix version for this 
to be 1.4.2 for now. If you disagree, please let me know.


was (Author: tzulitai):
[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?
I will change the fix version for this to be 1.4.2 for now. If you disagree, 
please let me know.

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:39 PM:


[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?
I will change the fix version for this to be 1.4.2 for now. If you disagree, 
please let me know.


was (Author: tzulitai):
[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6321.

Resolution: Fixed

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357278#comment-16357278
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6321:


I am marking this as resolved now, due to the fix for FLINK-7760.
[~shashank734] if you object and think there is still an issue, please reopen.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5372:


[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8614) Enable Flip-6 per default

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357253#comment-16357253
 ] 

ASF GitHub Bot commented on FLINK-8614:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5437

[FLINK-8614] [flip6] Activate Flip-6 mode per default

## What is the purpose of the change

This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).

This PR is based on #5436, #5432 and #5388.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink flip6Default

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5437.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5437


commit 56ed240dd2c3e8f2775208ae349f137acb53bd34
Author: Till Rohrmann 
Date:   2018-02-07T16:00:40Z

[FLINK-8613] [flip6] [yarn] Return excess containers

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.

commit f9d9fd0d4c22defd4f839b7078596e6e440d26c1
Author: Till Rohrmann 
Date:   2018-01-30T14:15:49Z

[FLINK-8529] [flip6] Let Yarn entry points use APPLICATION_MASTER_PORT

Let all Yarn entry points use the YarnConfigOptions.APPLICATION_MASTER_PORT 
option
to specify the valid port range for the common RpcService.

commit acc942e5237bcf8cc482c564c061d741befbff53
Author: Till Rohrmann 
Date:   2018-02-06T15:47:28Z

[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.

commit 52e8e74a6c977c68be0bdb62017bcbac0bb5ed2f
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit d0ab48e090f142bf5287528b6d0198b702e169f0
Author: Till Rohrmann 
Date:   2018-02-07T17:21:06Z

[hotfix] [yarn] Write number of slots to configuration

commit 1d38657ab86b6f7cff7b9b6c878ee4573aedff71
Author: Till Rohrmann 
Date:   2018-02-07T17:58:32Z

[hotfix] [yarn] Remove unnecessary TaskManager configuration generation

commit 76b6ce0ca587377b14fbd6eda48f52d4da13f09e
Author: Till Rohrmann 
Date:   2018-02-08T09:27:27Z

[hotfix] Only log retrying exception on debug in RetryingRegistration

commit 00de9bf451f5cb56e96938a7cf0dd9c5e38ea595
Author: Till Rohrmann 
Date:   2018-01-30T08:22:03Z

[FLINK-8614] [flip6] Activate Flip-6 mode per default

This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).




> Enable Flip-6 per default
> -
>
> Key: FLINK-8614
> URL: https://issues.apache.org/jira/browse/FLINK-8614
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After 

[GitHub] flink pull request #5437: [FLINK-8614] [flip6] Activate Flip-6 mode per defa...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5437

[FLINK-8614] [flip6] Activate Flip-6 mode per default

## What is the purpose of the change

This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).

This PR is based on #5436, #5432 and #5388.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink flip6Default

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5437.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5437


commit 56ed240dd2c3e8f2775208ae349f137acb53bd34
Author: Till Rohrmann 
Date:   2018-02-07T16:00:40Z

[FLINK-8613] [flip6] [yarn] Return excess containers

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.

commit f9d9fd0d4c22defd4f839b7078596e6e440d26c1
Author: Till Rohrmann 
Date:   2018-01-30T14:15:49Z

[FLINK-8529] [flip6] Let Yarn entry points use APPLICATION_MASTER_PORT

Let all Yarn entry points use the YarnConfigOptions.APPLICATION_MASTER_PORT 
option
to specify the valid port range for the common RpcService.

commit acc942e5237bcf8cc482c564c061d741befbff53
Author: Till Rohrmann 
Date:   2018-02-06T15:47:28Z

[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.

commit 52e8e74a6c977c68be0bdb62017bcbac0bb5ed2f
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit d0ab48e090f142bf5287528b6d0198b702e169f0
Author: Till Rohrmann 
Date:   2018-02-07T17:21:06Z

[hotfix] [yarn] Write number of slots to configuration

commit 1d38657ab86b6f7cff7b9b6c878ee4573aedff71
Author: Till Rohrmann 
Date:   2018-02-07T17:58:32Z

[hotfix] [yarn] Remove unnecessary TaskManager configuration generation

commit 76b6ce0ca587377b14fbd6eda48f52d4da13f09e
Author: Till Rohrmann 
Date:   2018-02-08T09:27:27Z

[hotfix] Only log retrying exception on debug in RetryingRegistration

commit 00de9bf451f5cb56e96938a7cf0dd9c5e38ea595
Author: Till Rohrmann 
Date:   2018-01-30T08:22:03Z

[FLINK-8614] [flip6] Activate Flip-6 mode per default

This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).




---


[jira] [Created] (FLINK-8614) Enable Flip-6 per default

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8614:


 Summary: Enable Flip-6 per default
 Key: FLINK-8614
 URL: https://issues.apache.org/jira/browse/FLINK-8614
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


After adding the FLINK-8471, the next step is to enable Flip-6 per default by 
setting the configuration switch to {{flip6}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357243#comment-16357243
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5436

[FLINK-8613] [flip6] [yarn] Return excess containers

## What is the purpose of the change

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5436.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5436


commit 56ed240dd2c3e8f2775208ae349f137acb53bd34
Author: Till Rohrmann 
Date:   2018-02-07T16:00:40Z

[FLINK-8613] [flip6] [yarn] Return excess containers

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.




> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5436

[FLINK-8613] [flip6] [yarn] Return excess containers

## What is the purpose of the change

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5436.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5436


commit 56ed240dd2c3e8f2775208ae349f137acb53bd34
Author: Till Rohrmann 
Date:   2018-02-07T16:00:40Z

[FLINK-8613] [flip6] [yarn] Return excess containers

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.




---


[jira] [Created] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8613:


 Summary: Return excess container in YarnResourceManager
 Key: FLINK-8613
 URL: https://issues.apache.org/jira/browse/FLINK-8613
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination, YARN
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{YarnResourceManager}} should return excess containers which the Yarn 
RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8612) Add non-detached job mode

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357229#comment-16357229
 ] 

ASF GitHub Bot commented on FLINK-8612:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5435

[FLINK-8612] [flip6] Enable non-detached job mode

## What is the purpose of the change

The non-detached job mode waits until has served the JobResult of
a completed job at least once before it terminates.

This PR is based on #5434 and #5433.

## Verifying this change

- Added `MiniDispatcherTest#testJobResultRetrieval`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addNormalJobMode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5435.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5435


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d
Author: Till Rohrmann 
Date:   2018-02-07T13:12:54Z

[hotfix] Fix checkstyle violations in JobExecutionResult

commit 3c414687bc73229f045d861d57da3843ca470469
Author: Till Rohrmann 
Date:   2018-02-07T13:56:45Z

[FLINK-8611] [flip6] Add result future to JobManagerRunner

This commit adds a CompletableFuture to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.

commit a9af6165cdee7d8e8dcf503d160ad5f1330a0076
Author: Till Rohrmann 
Date:   2018-02-07T10:09:34Z

[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway

The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.

commit bcd803845976147d5c13b0b92f196346d6412c58
Author: Till Rohrmann 
Date:   2018-02-08T13:51:25Z

[FLINK-8612] [flip6] Enable non-detached job mode

The non-detached job mode waits until has served the JobResult of
a completed job at least once before it terminates.




> Add non-detached job mode
> -
>
> Key: FLINK-8612
> URL: https://issues.apache.org/jira/browse/FLINK-8612
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to support the non-detached job mode, the {{MiniDispatcher}} has to 
> wait until it has served the {{JobResult}} of a completed job at least once 
> before it terminates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5435: [FLINK-8612] [flip6] Enable non-detached job mode

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5435

[FLINK-8612] [flip6] Enable non-detached job mode

## What is the purpose of the change

The non-detached job mode waits until has served the JobResult of
a completed job at least once before it terminates.

This PR is based on #5434 and #5433.

## Verifying this change

- Added `MiniDispatcherTest#testJobResultRetrieval`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addNormalJobMode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5435.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5435


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d
Author: Till Rohrmann 
Date:   2018-02-07T13:12:54Z

[hotfix] Fix checkstyle violations in JobExecutionResult

commit 3c414687bc73229f045d861d57da3843ca470469
Author: Till Rohrmann 
Date:   2018-02-07T13:56:45Z

[FLINK-8611] [flip6] Add result future to JobManagerRunner

This commit adds a CompletableFuture to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.

commit a9af6165cdee7d8e8dcf503d160ad5f1330a0076
Author: Till Rohrmann 
Date:   2018-02-07T10:09:34Z

[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway

The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.

commit bcd803845976147d5c13b0b92f196346d6412c58
Author: Till Rohrmann 
Date:   2018-02-08T13:51:25Z

[FLINK-8612] [flip6] Enable non-detached job mode

The non-detached job mode waits until has served the JobResult of
a completed job at least once before it terminates.




---


[jira] [Created] (FLINK-8612) Add non-detached job mode

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8612:


 Summary: Add non-detached job mode
 Key: FLINK-8612
 URL: https://issues.apache.org/jira/browse/FLINK-8612
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to support the non-detached job mode, the {{MiniDispatcher}} has to 
wait until it has served the {{JobResult}} of a completed job at least once 
before it terminates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8611) Add result future to JobManagerRunner

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357185#comment-16357185
 ] 

ASF GitHub Bot commented on FLINK-8611:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5434

[FLINK-8611] [flip6] Add result future to JobManagerRunner

## What is the purpose of the change

This commit adds a CompletableFuture to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.

This PR is based on #5431.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobManagerRunnerFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5434.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5434


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d
Author: Till Rohrmann 
Date:   2018-02-07T13:12:54Z

[hotfix] Fix checkstyle violations in JobExecutionResult

commit 3c414687bc73229f045d861d57da3843ca470469
Author: Till Rohrmann 
Date:   2018-02-07T13:56:45Z

[FLINK-8611] [flip6] Add result future to JobManagerRunner

This commit adds a CompletableFuture to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.




> Add result future to JobManagerRunner
> -
>
> Key: FLINK-8611
> URL: https://issues.apache.org/jira/browse/FLINK-8611
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Adding a {{CompletableFuture}} result future to the 
> {{JobManagerRunner}} will allow to return a {{JobResult}} future for an still 
> running job. This is helpful for the implementation of a non-detached job 
> mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5434: [FLINK-8611] [flip6] Add result future to JobManag...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5434

[FLINK-8611] [flip6] Add result future to JobManagerRunner

## What is the purpose of the change

This commit adds a CompletableFuture to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.

This PR is based on #5431.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobManagerRunnerFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5434.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5434


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d
Author: Till Rohrmann 
Date:   2018-02-07T13:12:54Z

[hotfix] Fix checkstyle violations in JobExecutionResult

commit 3c414687bc73229f045d861d57da3843ca470469
Author: Till Rohrmann 
Date:   2018-02-07T13:56:45Z

[FLINK-8611] [flip6] Add result future to JobManagerRunner

This commit adds a CompletableFuture to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.




---


[jira] [Created] (FLINK-8611) Add result future to JobManagerRunner

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8611:


 Summary: Add result future to JobManagerRunner
 Key: FLINK-8611
 URL: https://issues.apache.org/jira/browse/FLINK-8611
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


Adding a {{CompletableFuture}} result future to the 
{{JobManagerRunner}} will allow to return a {{JobResult}} future for an still 
running job. This is helpful for the implementation of a non-detached job mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-8318.

Resolution: Fixed
  Assignee: Nico Kruber

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Classpath:
> ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:...
> 
> 2017-12-26 14:14:01,393 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> Unnamed (3/10) 

[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357172#comment-16357172
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8318:


We have a fix now in `1.5.0` and `1.4.1` that shades away all ES dependencies.

1.4 - 33ebc85c281fcd4869fa743f88c2c71d339f9857
1.5 - 8395508b0401353ed07375e22882e7581d46ac0e

Therefore, I think we can mark issue as resolved.
Please object if you disagree, [~Jihyun Cho].

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Classpath:
> 

[jira] [Resolved] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-8362.

   Resolution: Fixed
Fix Version/s: 1.4.1
   1.5.0

> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357164#comment-16357164
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8362:


Merged.

1.4 - 33ebc85c281fcd4869fa743f88c2c71d339f9857
1.5 - 8395508b0401353ed07375e22882e7581d46ac0e

> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357148#comment-16357148
 ] 

ASF GitHub Bot commented on FLINK-8362:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5426


> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357149#comment-16357149
 ] 

ASF GitHub Bot commented on FLINK-8362:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5243


> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5426: [FLINK-8362] [elasticsearch] Shade all ES connecto...

2018-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5426


---


[GitHub] flink pull request #5243: [FLINK-8362][elasticsearch] shade all dependencies

2018-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5243


---


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357135#comment-16357135
 ] 

ASF GitHub Bot commented on FLINK-8362:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5426
  
Build passing locally, merging ...


> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5426: [FLINK-8362] [elasticsearch] Shade all ES connector depen...

2018-02-08 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5426
  
Build passing locally, merging ...


---


[GitHub] flink pull request #5433: [FLINK-8610] [flip6] Remove RestfulGateway from Jo...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5433

[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway

## What is the purpose of the change

The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.

This PR is based on #5431.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
removeRestfulGatewayFromJobMaster

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5433.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5433


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit 1da4fa2dab9c7e6ab6aecfe5c8512de217fcce9a
Author: Till Rohrmann 
Date:   2018-02-07T10:09:34Z

[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway

The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.




---


[jira] [Commented] (FLINK-8610) Remove RestfulGateway from JobMasterGateway

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357130#comment-16357130
 ] 

ASF GitHub Bot commented on FLINK-8610:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5433

[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway

## What is the purpose of the change

The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.

This PR is based on #5431.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
removeRestfulGatewayFromJobMaster

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5433.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5433


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit 1da4fa2dab9c7e6ab6aecfe5c8512de217fcce9a
Author: Till Rohrmann 
Date:   2018-02-07T10:09:34Z

[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway

The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.




> Remove RestfulGateway from JobMasterGateway
> ---
>
> Key: FLINK-8610
> URL: https://issues.apache.org/jira/browse/FLINK-8610
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the 
> {{RestfulGateway}}. Therefore, we should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8610) Remove RestfulGateway from JobMasterGateway

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8610:


 Summary: Remove RestfulGateway from JobMasterGateway
 Key: FLINK-8610
 URL: https://issues.apache.org/jira/browse/FLINK-8610
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the 
{{RestfulGateway}}. Therefore, we should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8609) Add support to deploy detached job mode clusters

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357116#comment-16357116
 ] 

ASF GitHub Bot commented on FLINK-8609:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5432

[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

## What is the purpose of the change

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.

This PR is based on #5431.

## Brief change log

- Extract `JobGraph` from `PackagedProgram` in `CliFrontend`
- Deploy job mode cluster if flip-6 is enabled

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink enableJobMode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5432.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5432


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit be2d9dfa515b1577f6d7a67b726d9e704281a1cc
Author: Till Rohrmann 
Date:   2018-02-06T15:47:28Z

[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.




> Add support to deploy detached job mode clusters
> 
>
> Key: FLINK-8609
> URL: https://issues.apache.org/jira/browse/FLINK-8609
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy 
> detached job mode clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5432: [FLINK-8609] [flip6] Enable Flip-6 job mode in Cli...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5432

[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

## What is the purpose of the change

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.

This PR is based on #5431.

## Brief change log

- Extract `JobGraph` from `PackagedProgram` in `CliFrontend`
- Deploy job mode cluster if flip-6 is enabled

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink enableJobMode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5432.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5432


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

commit be2d9dfa515b1577f6d7a67b726d9e704281a1cc
Author: Till Rohrmann 
Date:   2018-02-06T15:47:28Z

[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.




---


[jira] [Created] (FLINK-8609) Add support to deploy detached job mode clusters

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8609:


 Summary: Add support to deploy detached job mode clusters
 Key: FLINK-8609
 URL: https://issues.apache.org/jira/browse/FLINK-8609
 Project: Flink
  Issue Type: New Feature
  Components: Client
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy 
detached job mode clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-6590) Integrate generated tables into documentation

2018-02-08 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-6590:
-

reopen to modify title

> Integrate generated tables into documentation
> -
>
> Key: FLINK-6590
> URL: https://issues.apache.org/jira/browse/FLINK-6590
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6590) Integrate generated tables into documentation

2018-02-08 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-6590.
---
Resolution: Fixed

> Integrate generated tables into documentation
> -
>
> Key: FLINK-6590
> URL: https://issues.apache.org/jira/browse/FLINK-6590
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8530) Enable detached job submission for RestClusterClient

2018-02-08 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8530.

Resolution: Won't Do

> Enable detached job submission for RestClusterClient
> 
>
> Key: FLINK-8530
> URL: https://issues.apache.org/jira/browse/FLINK-8530
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} should also be able to submit jobs in detached 
> mode. In detached mode, we don't wait for the {{JobExecutionResult}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8608) Add MiniDispatcher for job mode

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357086#comment-16357086
 ] 

ASF GitHub Bot commented on FLINK-8608:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5431

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

## What is the purpose of the change

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

## Brief change log

- Introduce `SingleJobSubmittedJobGraphStore` for the `MiniDispatcher`
- Refactored `JobClusterEntrypoint` and `SessionClusterEntrypoint` such 
that most of the cluster component logic moved to `ClusterEntrypoint`
- Renamed `JobMasterRestEndpoint` into `MiniDispatcherRestEndpoint`
- Initialize `MiniDispatcher` with single job retrieved by the 
`JobClusterEntrypoint`
- Terminate `MiniDispatcher` after job completion if the its execution mode 
is `detached`

## Verifying this change

- Added `MiniDispatcherTest`
- The tests of `MiniDispatcher` are also valid for the `MiniDispatcher`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink implementMiniDispatcher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5431


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.




> Add MiniDispatcher for job mode
> ---
>
> Key: FLINK-8608
> URL: https://issues.apache.org/jira/browse/FLINK-8608
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which 
> is started with a pre initialized {{JobGraph}} and launches a single 
> {{JobManagerRunner}} with this job. Once the job is completed and if the 
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should 
> terminate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5431

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

## What is the purpose of the change

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

## Brief change log

- Introduce `SingleJobSubmittedJobGraphStore` for the `MiniDispatcher`
- Refactored `JobClusterEntrypoint` and `SessionClusterEntrypoint` such 
that most of the cluster component logic moved to `ClusterEntrypoint`
- Renamed `JobMasterRestEndpoint` into `MiniDispatcherRestEndpoint`
- Initialize `MiniDispatcher` with single job retrieved by the 
`JobClusterEntrypoint`
- Terminate `MiniDispatcher` after job completion if the its execution mode 
is `detached`

## Verifying this change

- Added `MiniDispatcherTest`
- The tests of `MiniDispatcher` are also valid for the `MiniDispatcher`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink implementMiniDispatcher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5431


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.




---


[jira] [Created] (FLINK-8608) Add MiniDispatcher for job mode

2018-02-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8608:


 Summary: Add MiniDispatcher for job mode
 Key: FLINK-8608
 URL: https://issues.apache.org/jira/browse/FLINK-8608
 Project: Flink
  Issue Type: New Feature
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to properly support the job mode, we need a {{MiniDispatcher}} which 
is started with a pre initialized {{JobGraph}} and launches a single 
{{JobManagerRunner}} with this job. Once the job is completed and if the 
{{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should 
terminate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357075#comment-16357075
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166972719
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
+ */
+public class BroadcastExample {
+
+   public static void main(String[] args) throws Exception {
+
+   final List input = new ArrayList<>();
+   input.add(1);
+   input.add(2);
+   input.add(3);
+   input.add(4);
+
+   final List> keyedInput = new 
ArrayList<>();
+   keyedInput.add(new Tuple2<>(1, 1));
+   keyedInput.add(new Tuple2<>(1, 5));
+   keyedInput.add(new Tuple2<>(2, 2));
+   keyedInput.add(new Tuple2<>(2, 6));
+   keyedInput.add(new Tuple2<>(3, 3));
+   keyedInput.add(new Tuple2<>(3, 7));
+   keyedInput.add(new Tuple2<>(4, 4));
+   keyedInput.add(new Tuple2<>(4, 8));
+
+   final ValueStateDescriptor valueState = new 
ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   final MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>(
+   "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+   );
+
+   KeyedStream, Integer> elementStream = 
env.fromCollection(keyedInput).rebalance()
+   .map(new MapFunction, 
Tuple2>() {
+   private static final long 
serialVersionUID = 8710586935083422712L;
+
+   @Override
+   public Tuple2 
map(Tuple2 value) {
+   return value;
+   }
+   }).setParallelism(4).keyBy(new 
KeySelector, Integer>() {
+   private static final long 
serialVersionUID = -1110876099102344900L;
+
+   @Override
+   public Integer getKey(Tuple2 value) {
+   return value.f0;
+  

[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357074#comment-16357074
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166970474
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
--- End diff --

Probably needs a better Javadoc...  



> Add Scala API for Connected Streams with Broadcast State.
> -
>
> Key: FLINK-8456
> URL: https://issues.apache.org/jira/browse/FLINK-8456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357078#comment-16357078
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166972441
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
+ */
+public class BroadcastExample {
+
+   public static void main(String[] args) throws Exception {
+
+   final List input = new ArrayList<>();
+   input.add(1);
+   input.add(2);
+   input.add(3);
+   input.add(4);
+
+   final List> keyedInput = new 
ArrayList<>();
+   keyedInput.add(new Tuple2<>(1, 1));
+   keyedInput.add(new Tuple2<>(1, 5));
+   keyedInput.add(new Tuple2<>(2, 2));
+   keyedInput.add(new Tuple2<>(2, 6));
+   keyedInput.add(new Tuple2<>(3, 3));
+   keyedInput.add(new Tuple2<>(3, 7));
+   keyedInput.add(new Tuple2<>(4, 4));
+   keyedInput.add(new Tuple2<>(4, 8));
+
+   final ValueStateDescriptor valueState = new 
ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   final MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>(
+   "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+   );
+
+   KeyedStream, Integer> elementStream = 
env.fromCollection(keyedInput).rebalance()
+   .map(new MapFunction, 
Tuple2>() {
+   private static final long 
serialVersionUID = 8710586935083422712L;
+
+   @Override
+   public Tuple2 
map(Tuple2 value) {
+   return value;
+   }
+   }).setParallelism(4).keyBy(new 
KeySelector, Integer>() {
--- End diff --

The other calls should also go on new lines, I think that's the usual 
example style


> Add Scala API for Connected Streams with Broadcast State.
> -
>
> Key: FLINK-8456
> URL: https://issues.apache.org/jira/browse/FLINK-8456
> 

[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357079#comment-16357079
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166973933
  
--- Diff: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.util.Collector
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+  * ITCase for the [[org.apache.flink.api.common.state.BroadcastState]].
+  */
+class BroadcastStateITCase extends AbstractTestBase {
+
+  @Test
+  @throws[Exception]
+  def testConnectWithBroadcastTranslation(): Unit = {
+
+val timerTimestamp = 10L
+
+val DESCRIPTOR = new MapStateDescriptor[Long, String](
+  "broadcast-state",
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+  BasicTypeInfo.STRING_TYPE_INFO)
+
+val expected = Map[Long, String](
+  0L -> "test:0",
+  1L -> "test:1",
+  2L -> "test:2",
+  3L -> "test:3",
+  4L -> "test:4",
+  5L -> "test:5")
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val srcOne = env.generateSequence(0L, 5L)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks[Long]() {
+
+override def extractTimestamp(element: Long, 
previousElementTimestamp: Long): Long =
+  element
+
+override def checkAndGetNextWatermark(lastElement: Long, 
extractedTimestamp: Long) =
+  new Watermark(extractedTimestamp)
+
+  }).keyBy((value: Long) => value)
+
+val srcTwo = env.fromCollection(expected.values.toSeq)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks[String]() {
+
+override def extractTimestamp(element: String, 
previousElementTimestamp: Long): Long =
+  element.split(":")(1).toLong
+
+override def checkAndGetNextWatermark(lastElement: String, 
extractedTimestamp: Long) =
+  new Watermark(extractedTimestamp)
+  })
+
+val broadcast = srcTwo.broadcast(DESCRIPTOR)
+// the timestamp should be high enough to trigger the timer after all 
the elements arrive.
+val output = srcOne.connect(broadcast).process(
+  new KeyedBroadcastProcessFunction[Long, Long, String, String]() {
+
+@throws[Exception]
+override def processElement(
+value: Long,
+ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#KeyedReadOnlyContext,
+out: Collector[String]): Unit = {
+
+  ctx.timerService.registerEventTimeTimer(timerTimestamp)
+}
+
+@throws[Exception]
+override def processBroadcastElement(
+value: String,
+ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#KeyedContext,
+out: Collector[String]): Unit = {
+
+  val key = value.split(":")(1).toLong
+  

[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357076#comment-16357076
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166972253
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
+ */
+public class BroadcastExample {
+
+   public static void main(String[] args) throws Exception {
+
+   final List input = new ArrayList<>();
+   input.add(1);
+   input.add(2);
+   input.add(3);
+   input.add(4);
+
+   final List> keyedInput = new 
ArrayList<>();
+   keyedInput.add(new Tuple2<>(1, 1));
+   keyedInput.add(new Tuple2<>(1, 5));
+   keyedInput.add(new Tuple2<>(2, 2));
+   keyedInput.add(new Tuple2<>(2, 6));
+   keyedInput.add(new Tuple2<>(3, 3));
+   keyedInput.add(new Tuple2<>(3, 7));
+   keyedInput.add(new Tuple2<>(4, 4));
+   keyedInput.add(new Tuple2<>(4, 8));
+
+   final ValueStateDescriptor valueState = new 
ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   final MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>(
+   "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+   );
+
+   KeyedStream, Integer> elementStream = 
env.fromCollection(keyedInput).rebalance()
--- End diff --

I'd put the `rebalance()` on a new line and maybe add a comment on why it's 
needed


> Add Scala API for Connected Streams with Broadcast State.
> -
>
> Key: FLINK-8456
> URL: https://issues.apache.org/jira/browse/FLINK-8456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357077#comment-16357077
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r16697
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -441,6 +463,26 @@ class DataStream[T](stream: JavaStream[T]) {
*/
   def broadcast: DataStream[T] = asScalaStream(stream.broadcast())
 
+  /**
+* Sets the partitioning of the [[DataStream]] so that the output 
elements
+* are broadcasted to every parallel instance of the next operation. In 
addition,
+* it implicitly creates a
--- End diff --

I think the "creates a" is outdated here because you can specify multiple 
descriptors. This should also be fixed in the Java API.


> Add Scala API for Connected Streams with Broadcast State.
> -
>
> Key: FLINK-8456
> URL: https://issues.apache.org/jira/browse/FLINK-8456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...

2018-02-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166973933
  
--- Diff: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.util.Collector
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+  * ITCase for the [[org.apache.flink.api.common.state.BroadcastState]].
+  */
+class BroadcastStateITCase extends AbstractTestBase {
+
+  @Test
+  @throws[Exception]
+  def testConnectWithBroadcastTranslation(): Unit = {
+
+val timerTimestamp = 10L
+
+val DESCRIPTOR = new MapStateDescriptor[Long, String](
+  "broadcast-state",
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+  BasicTypeInfo.STRING_TYPE_INFO)
+
+val expected = Map[Long, String](
+  0L -> "test:0",
+  1L -> "test:1",
+  2L -> "test:2",
+  3L -> "test:3",
+  4L -> "test:4",
+  5L -> "test:5")
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val srcOne = env.generateSequence(0L, 5L)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks[Long]() {
+
+override def extractTimestamp(element: Long, 
previousElementTimestamp: Long): Long =
+  element
+
+override def checkAndGetNextWatermark(lastElement: Long, 
extractedTimestamp: Long) =
+  new Watermark(extractedTimestamp)
+
+  }).keyBy((value: Long) => value)
+
+val srcTwo = env.fromCollection(expected.values.toSeq)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks[String]() {
+
+override def extractTimestamp(element: String, 
previousElementTimestamp: Long): Long =
+  element.split(":")(1).toLong
+
+override def checkAndGetNextWatermark(lastElement: String, 
extractedTimestamp: Long) =
+  new Watermark(extractedTimestamp)
+  })
+
+val broadcast = srcTwo.broadcast(DESCRIPTOR)
+// the timestamp should be high enough to trigger the timer after all 
the elements arrive.
+val output = srcOne.connect(broadcast).process(
+  new KeyedBroadcastProcessFunction[Long, Long, String, String]() {
+
+@throws[Exception]
+override def processElement(
+value: Long,
+ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#KeyedReadOnlyContext,
+out: Collector[String]): Unit = {
+
+  ctx.timerService.registerEventTimeTimer(timerTimestamp)
+}
+
+@throws[Exception]
+override def processBroadcastElement(
+value: String,
+ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#KeyedContext,
+out: Collector[String]): Unit = {
+
+  val key = value.split(":")(1).toLong
+  ctx.getBroadcastState(DESCRIPTOR).put(key, value)
+}
+
+@throws[Exception]
+override def onTimer(
+timestamp: Long,
+ctx: KeyedBroadcastProcessFunction[Long, Long, 

[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...

2018-02-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166972253
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
+ */
+public class BroadcastExample {
+
+   public static void main(String[] args) throws Exception {
+
+   final List input = new ArrayList<>();
+   input.add(1);
+   input.add(2);
+   input.add(3);
+   input.add(4);
+
+   final List> keyedInput = new 
ArrayList<>();
+   keyedInput.add(new Tuple2<>(1, 1));
+   keyedInput.add(new Tuple2<>(1, 5));
+   keyedInput.add(new Tuple2<>(2, 2));
+   keyedInput.add(new Tuple2<>(2, 6));
+   keyedInput.add(new Tuple2<>(3, 3));
+   keyedInput.add(new Tuple2<>(3, 7));
+   keyedInput.add(new Tuple2<>(4, 4));
+   keyedInput.add(new Tuple2<>(4, 8));
+
+   final ValueStateDescriptor valueState = new 
ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   final MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>(
+   "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+   );
+
+   KeyedStream, Integer> elementStream = 
env.fromCollection(keyedInput).rebalance()
--- End diff --

I'd put the `rebalance()` on a new line and maybe add a comment on why it's 
needed


---


[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...

2018-02-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166972441
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
+ */
+public class BroadcastExample {
+
+   public static void main(String[] args) throws Exception {
+
+   final List input = new ArrayList<>();
+   input.add(1);
+   input.add(2);
+   input.add(3);
+   input.add(4);
+
+   final List> keyedInput = new 
ArrayList<>();
+   keyedInput.add(new Tuple2<>(1, 1));
+   keyedInput.add(new Tuple2<>(1, 5));
+   keyedInput.add(new Tuple2<>(2, 2));
+   keyedInput.add(new Tuple2<>(2, 6));
+   keyedInput.add(new Tuple2<>(3, 3));
+   keyedInput.add(new Tuple2<>(3, 7));
+   keyedInput.add(new Tuple2<>(4, 4));
+   keyedInput.add(new Tuple2<>(4, 8));
+
+   final ValueStateDescriptor valueState = new 
ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   final MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>(
+   "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+   );
+
+   KeyedStream, Integer> elementStream = 
env.fromCollection(keyedInput).rebalance()
+   .map(new MapFunction, 
Tuple2>() {
+   private static final long 
serialVersionUID = 8710586935083422712L;
+
+   @Override
+   public Tuple2 
map(Tuple2 value) {
+   return value;
+   }
+   }).setParallelism(4).keyBy(new 
KeySelector, Integer>() {
--- End diff --

The other calls should also go on new lines, I think that's the usual 
example style


---


[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...

2018-02-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166970474
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
--- End diff --

Probably needs a better Javadoc... 😉 



---


[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...

2018-02-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r16697
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -441,6 +463,26 @@ class DataStream[T](stream: JavaStream[T]) {
*/
   def broadcast: DataStream[T] = asScalaStream(stream.broadcast())
 
+  /**
+* Sets the partitioning of the [[DataStream]] so that the output 
elements
+* are broadcasted to every parallel instance of the next operation. In 
addition,
+* it implicitly creates a
--- End diff --

I think the "creates a" is outdated here because you can specify multiple 
descriptors. This should also be fixed in the Java API.


---


[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...

2018-02-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5425#discussion_r166972719
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.broadcast;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Testing example.
+ */
+public class BroadcastExample {
+
+   public static void main(String[] args) throws Exception {
+
+   final List input = new ArrayList<>();
+   input.add(1);
+   input.add(2);
+   input.add(3);
+   input.add(4);
+
+   final List> keyedInput = new 
ArrayList<>();
+   keyedInput.add(new Tuple2<>(1, 1));
+   keyedInput.add(new Tuple2<>(1, 5));
+   keyedInput.add(new Tuple2<>(2, 2));
+   keyedInput.add(new Tuple2<>(2, 6));
+   keyedInput.add(new Tuple2<>(3, 3));
+   keyedInput.add(new Tuple2<>(3, 7));
+   keyedInput.add(new Tuple2<>(4, 4));
+   keyedInput.add(new Tuple2<>(4, 8));
+
+   final ValueStateDescriptor valueState = new 
ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   final MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>(
+   "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+   );
+
+   KeyedStream, Integer> elementStream = 
env.fromCollection(keyedInput).rebalance()
+   .map(new MapFunction, 
Tuple2>() {
+   private static final long 
serialVersionUID = 8710586935083422712L;
+
+   @Override
+   public Tuple2 
map(Tuple2 value) {
+   return value;
+   }
+   }).setParallelism(4).keyBy(new 
KeySelector, Integer>() {
+   private static final long 
serialVersionUID = -1110876099102344900L;
+
+   @Override
+   public Integer getKey(Tuple2 value) {
+   return value.f0;
+   }
+   });
+
+   BroadcastStream broadcastStream = 
env.fromCollection(input)
+   .flatMap(new FlatMapFunction() {
+

[jira] [Created] (FLINK-8607) Add a basic embedded SQL CLI client

2018-02-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8607:
---

 Summary: Add a basic embedded SQL CLI client
 Key: FLINK-8607
 URL: https://issues.apache.org/jira/browse/FLINK-8607
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


This issue describes the Implementation Plan 1 of FLIP-24.

Goal: Add the basic features to play around with Flink's streaming SQL.

{code}
- Add CLI component that reads the configuration files
- "Pre-registered table sources"
- "Job parameters"
- Add executor for retrieving pre-flight information and corresponding CLI SQL 
parser
- SHOW TABLES
- DESCRIBE TABLE
- EXPLAIN
- Add streaming append query submission to executor
- Submit jars and run SELECT query using the ClusterClient
- Collect results on heap and serve them on the CLI side (Internal Mode with 
SELECT)
- SOURCE (for executing a SQL statement stored in a local file)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357012#comment-16357012
 ] 

ASF GitHub Bot commented on FLINK-8362:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5426
  
Thanks!
Since this is verified manually already, and we have a test that verifies 
everything is properly shaded, I'll merge this once Travis gives green.


> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5426: [FLINK-8362] [elasticsearch] Shade all ES connector depen...

2018-02-08 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5426
  
Thanks!
Since this is verified manually already, and we have a test that verifies 
everything is properly shaded, I'll merge this once Travis gives green.


---


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357008#comment-16357008
 ] 

ASF GitHub Bot commented on FLINK-8362:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5426
  
notice files look good to me.


> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5426: [FLINK-8362] [elasticsearch] Shade all ES connector depen...

2018-02-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5426
  
notice files look good to me.


---


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-02-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357003#comment-16357003
 ] 

Timo Walther commented on FLINK-8577:
-

Maybe we should also rename the methods to find them easier: 
{{tEnv.fromUpsertStream}}, {{tEnv.fromAppendStream}}, {{tEnv.fromDataSet()}}.

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >