[jira] [Updated] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8601: -- Description: h3. Backgroud Bloom filter is useful in many situation, for example: * 1. Approximate calculation: deduplication (eg: UV calculation) * 2. Performance optimization: eg, [runtime filter join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] By using BF, we can greatly reduce the number of queries for state data in a stream join, and these filtered queries will eventually fail to find any results, which is a poor performance for rocksdb-based state due to traversing ```sst``` on the disk. However, based on the current status provided by flink, it is hard to use the bloom filter for the following reasons: * 1. Serialization problem: Bloom filter status can be large (for example: 100M), if implement it based on the RocksDB state, the state data will need to be serialized each time it is queried and updated, and the performance will be very poor. * 2. Data skewed: Data in different key group can be skewed, and the information of data skewed can not be accurately predicted before the program is running. Therefore, it is impossible to determine how much resources bloom filter should allocate. One way to do this is to allocate space needed for the most skewed case, but this can lead to very serious waste of resources. h3. Requirement Therefore, I introduce the PartitionedBloomFilter for flink, which at least need to meet the following features: * 1. Support for changing Parallelism * 2. Only serialize when necessary: when performing checkpoint * 3. Can deal with data skew problem: users only need to specify a PartitionedBloomFilter with the desired input, fpp, system will allocate resource dynamic. * 4. Do not conflict with other state: user can use KeyedState and OperateState when using this bloom filter. * 5. Support relax ttl (ie: the data survival time at least greater than the specified time) Design doc: [design doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] was: h3. Backgroud Bloom filter is useful in many situation, for example: * 1. Approximate calculation: deduplication (eg: UV calculation) * 2. Performance optimization: eg, [runtime filter join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] By using BF, we can greatly reduce the number of queries for state data in a stream join, and these filtered queries will eventually fail to find any results, which is a poor performance for rocksdb-based state due to traversing ```sst``` on the disk. However, based on the current status provided by flink, it is hard to use the bloom filter for the following reasons: * 1. Serialization problem: Bloom filter status can be large (for example: 100M), if implement it based on the RocksDB state, the state data will need to be serialized each time it is queried and updated, and the performance will be very poor. * 2. Data skewed: Data in different key group can be skewed, and the information of data skewed can not be accurately predicted before the program is running. Therefore, it is impossible to determine how much resources bloom filter should allocate. One way to do this is to allocate space needed for the most skewed case, but this can lead to very serious waste of resources. h3. Requirement Therefore, I introduce the LinkedBloomFilter for flink, which at least need to meet the following features: * 1. Support for changing Parallelism * 2. Only serialize when necessary: when performing checkpoint * 3. Can deal with data skew problem: users only need to specify a LinkedBloomFilterState with the desired input, fpp, system will allocate resource dynamic. * 4. Do not conflict with other state: user can use KeyedState and OperateState when using bloom filter state. * 5. Support relax ttl (ie: the data survival time at least greater than the specified time) Design doc: [design doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] > Introduce PartitionedBloomFilter for Approximate calculation and other > situations of performance optimization > - > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV
[jira] [Updated] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8601: -- Summary: Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization (was: Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization) > Introduce PartitionedBloomFilter for Approximate calculation and other > situations of performance optimization > - > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV calculation) > * 2. Performance optimization: eg, [runtime filter > join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] >By using BF, we can greatly reduce the number of queries for state > data in a stream join, and these filtered queries will eventually fail to > find any results, which is a poor performance for rocksdb-based state due to > traversing ```sst``` on the disk. > However, based on the current status provided by flink, it is hard to use the > bloom filter for the following reasons: > * 1. Serialization problem: Bloom filter status can be large (for example: > 100M), if implement it based on the RocksDB state, the state data will need > to be serialized each time it is queried and updated, and the performance > will be very poor. > * 2. Data skewed: Data in different key group can be skewed, and the > information of data skewed can not be accurately predicted before the program > is running. Therefore, it is impossible to determine how much resources bloom > filter should allocate. One way to do this is to allocate space needed for > the most skewed case, but this can lead to very serious waste of resources. > h3. Requirement > Therefore, I introduce the LinkedBloomFilter for flink, which at least need > to meet the following features: > * 1. Support for changing Parallelism > * 2. Only serialize when necessary: when performing checkpoint > * 3. Can deal with data skew problem: users only need to specify a > LinkedBloomFilterState with the desired input, fpp, system will allocate > resource dynamic. > * 4. Do not conflict with other state: user can use KeyedState and > OperateState when using bloom filter state. > * 5. Support relax ttl (ie: the data survival time at least greater than the > specified time) > Design doc: [design > doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8601) Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358051#comment-16358051 ] Sihua Zhou commented on FLINK-8601: --- [~fhueske] I have update the document, could you please have a loot at it? > Introduce LinkedBloomFilter for Approximate calculation and other situations > of performance optimization > > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV calculation) > * 2. Performance optimization: eg, [runtime filter > join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] >By using BF, we can greatly reduce the number of queries for state > data in a stream join, and these filtered queries will eventually fail to > find any results, which is a poor performance for rocksdb-based state due to > traversing ```sst``` on the disk. > However, based on the current status provided by flink, it is hard to use the > bloom filter for the following reasons: > * 1. Serialization problem: Bloom filter status can be large (for example: > 100M), if implement it based on the RocksDB state, the state data will need > to be serialized each time it is queried and updated, and the performance > will be very poor. > * 2. Data skewed: Data in different key group can be skewed, and the > information of data skewed can not be accurately predicted before the program > is running. Therefore, it is impossible to determine how much resources bloom > filter should allocate. One way to do this is to allocate space needed for > the most skewed case, but this can lead to very serious waste of resources. > h3. Requirement > Therefore, I introduce the LinkedBloomFilter for flink, which at least need > to meet the following features: > * 1. Support for changing Parallelism > * 2. Only serialize when necessary: when performing checkpoint > * 3. Can deal with data skew problem: users only need to specify a > LinkedBloomFilterState with the desired input, fpp, system will allocate > resource dynamic. > * 4. Do not conflict with other state: user can use KeyedState and > OperateState when using bloom filter state. > * 5. Support relax ttl (ie: the data survival time at least greater than the > specified time) > Design doc: [design > doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8601) Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8601: -- Component/s: (was: Core) State Backends, Checkpointing > Introduce LinkedBloomFilter for Approximate calculation and other situations > of performance optimization > > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV calculation) > * 2. Performance optimization: eg, [runtime filter > join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] >By using BF, we can greatly reduce the number of queries for state > data in a stream join, and these filtered queries will eventually fail to > find any results, which is a poor performance for rocksdb-based state due to > traversing ```sst``` on the disk. > However, based on the current status provided by flink, it is hard to use the > bloom filter for the following reasons: > * 1. Serialization problem: Bloom filter status can be large (for example: > 100M), if implement it based on the RocksDB state, the state data will need > to be serialized each time it is queried and updated, and the performance > will be very poor. > * 2. Data skewed: Data in different key group can be skewed, and the > information of data skewed can not be accurately predicted before the program > is running. Therefore, it is impossible to determine how much resources bloom > filter should allocate. One way to do this is to allocate space needed for > the most skewed case, but this can lead to very serious waste of resources. > h3. Requirement > Therefore, I introduce the LinkedBloomFilter for flink, which at least need > to meet the following features: > * 1. Support for changing Parallelism > * 2. Only serialize when necessary: when performing checkpoint > * 3. Can deal with data skew problem: users only need to specify a > LinkedBloomFilterState with the desired input, fpp, system will allocate > resource dynamic. > * 4. Do not conflict with other state: user can use KeyedState and > OperateState when using bloom filter state. > * 5. Support relax ttl (ie: the data survival time at least greater than the > specified time) > Design doc: [design > doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8601) Introduce LinkedBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8601: -- Description: h3. Backgroud Bloom filter is useful in many situation, for example: * 1. Approximate calculation: deduplication (eg: UV calculation) * 2. Performance optimization: eg, [runtime filter join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] By using BF, we can greatly reduce the number of queries for state data in a stream join, and these filtered queries will eventually fail to find any results, which is a poor performance for rocksdb-based state due to traversing ```sst``` on the disk. However, based on the current status provided by flink, it is hard to use the bloom filter for the following reasons: * 1. Serialization problem: Bloom filter status can be large (for example: 100M), if implement it based on the RocksDB state, the state data will need to be serialized each time it is queried and updated, and the performance will be very poor. * 2. Data skewed: Data in different key group can be skewed, and the information of data skewed can not be accurately predicted before the program is running. Therefore, it is impossible to determine how much resources bloom filter should allocate. One way to do this is to allocate space needed for the most skewed case, but this can lead to very serious waste of resources. h3. Requirement Therefore, I introduce the LinkedBloomFilter for flink, which at least need to meet the following features: * 1. Support for changing Parallelism * 2. Only serialize when necessary: when performing checkpoint * 3. Can deal with data skew problem: users only need to specify a LinkedBloomFilterState with the desired input, fpp, system will allocate resource dynamic. * 4. Do not conflict with other state: user can use KeyedState and OperateState when using bloom filter state. * 5. Support relax ttl (ie: the data survival time at least greater than the specified time) Design doc: [design doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] was: h3. Backgroud Bloom filter is useful in many situation, for example: * 1. Approximate calculation: deduplication (eg: UV calculation) * 2. Performance optimization: eg, [runtime filter join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] By using BF, we can greatly reduce the number of queries for state data in a stream join, and these filtered queries will eventually fail to find any results, which is a poor performance for rocksdb-based state due to traversing ```sst``` on the disk. However, based on the current status provided by flink, it is hard to use the bloom filter for the following reasons: * 1. Serialization problem: Bloom filter status can be large (for example: 100M), if implement it based on the RocksDB state, the state data will need to be serialized each time it is queried and updated, and the performance will be very poor. * 2. Data skewed: Data in different key group can be skewed, and the information of data skewed can not be accurately predicted before the program is running. Therefore, it is impossible to determine how much resources bloom filter should allocate. One way to do this is to allocate space needed for the most skewed case, but this can lead to very serious waste of resources. h3. Requirement Therefore, I introduce the LinkedBloomFilter for flink, which at least need to meet the following features: * 1. Support for changing Parallelism * 2. Only serialize when necessary: when performing checkpoint * 3. Can deal with data skew problem: users only need to specify a LinkedBloomFilterState with the desired input, fpp, system will allocate resource dynamic. * 4. Do not conflict with other state: user can use KeyedState and OperateState when using bloom filter state. * 5. Support relax ttl (ie: the data survival time at least greater than the specified time) Design doc: [design doc|https://docs.google.com/document/d/1yMCT2ogE0CtSjzRvldgi0ZPPxC791PpkVGkVeqaUUI8/edit?usp=sharing] > Introduce LinkedBloomFilter for Approximate calculation and other situations > of performance optimization > > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.4.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV calculation) > * 2. Performance
[jira] [Closed] (FLINK-8616) Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8616. --- Resolution: Duplicate Duplicate of FLINK-8423. > Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator > masks ClassCastException > - > > Key: FLINK-8616 > URL: https://issues.apache.org/jira/browse/FLINK-8616 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Cliff Resnick >Priority: Major > > There is an attempt to enrich the exception with outputTag#getId, but > outputTag is null, and a NullPointerException is thrown. Looking at the > attempted message enrichment the code seems to assume a ClassCastException > can only stem from SideOutput type mismatches. This may have been the norm > before, but changes to classloader delegation in 1.4 have given rise to > multiple ClassLoader conflicts (at least for us), and they all seem to end up > here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE
[ https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8423: Priority: Critical (was: Minor) > OperatorChain#pushToOperator catch block may fail with NPE > -- > > Key: FLINK-8423 > URL: https://issues.apache.org/jira/browse/FLINK-8423 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > > {code} > @Override > protected void pushToOperator(StreamRecord record) { > try { > // we know that the given outputTag matches our OutputTag so > the record > // must be of the type that our operator (and Serializer) > expects. > @SuppressWarnings("unchecked") > StreamRecord castRecord = (StreamRecord) record; > numRecordsIn.inc(); > StreamRecord copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > // Enrich error message > ClassCastException replace = new ClassCastException( > String.format( > "%s. Failed to push OutputTag with id '%s' to > operator. " + > "This can occur when multiple OutputTags with > different types " + > "but identical names are being used.", > e.getMessage(), > outputTag.getId())); > throw new ExceptionInChainedOperatorException(replace); > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > } > {code} > If outputTag is null (as is the case when no sideOutput was defined) the > catch block will crash with a NullPointerException. This may happen if > {{operator.processElement}} throws a ClassCastException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai any update? ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357972#comment-16357972 ] ASF GitHub Bot commented on FLINK-8516: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai any update? > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn
[ https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ken Krugler closed FLINK-8615. -- Resolution: Not A Bug > Configuring Apache Flink Local Set up with Pseudo distributed Yarn > -- > > Key: FLINK-8615 > URL: https://issues.apache.org/jira/browse/FLINK-8615 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0 >Reporter: Karrtik Iyer >Priority: Major > > I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed > mode, following this blog: [Hadoop In Pseudo Distributed > Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html] > I have been able to successfully start hdfs and yarn. And also able to > submit Map reduce jobs. > After that I have download Apache Flink 1.4 from > [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz]. > Now I am trying to set up Flink on the above Yarn cluster by following the > steps > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html]. > When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting > below error which I am unable to resolve. Can someone please advise and help > me with the same? > > {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster > java.lang.RuntimeException: Couldn't deploy Yarn session cluster at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511) > Caused by: org.apache.flink.configuration.IllegalConfigurationException: The > number of virtual cores per node were configured with 1 but Yarn only has -1 > virtual cores available. Please note that the number of virtual cores is set > to the number of task slots by default unless configured in the Flink config > with 'yarn.containers.vcores.' at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn
[ https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357916#comment-16357916 ] Ken Krugler commented on FLINK-8615: https://flink.apache.org/community.html > Configuring Apache Flink Local Set up with Pseudo distributed Yarn > -- > > Key: FLINK-8615 > URL: https://issues.apache.org/jira/browse/FLINK-8615 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0 >Reporter: Karrtik Iyer >Priority: Major > > I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed > mode, following this blog: [Hadoop In Pseudo Distributed > Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html] > I have been able to successfully start hdfs and yarn. And also able to > submit Map reduce jobs. > After that I have download Apache Flink 1.4 from > [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz]. > Now I am trying to set up Flink on the above Yarn cluster by following the > steps > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html]. > When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting > below error which I am unable to resolve. Can someone please advise and help > me with the same? > > {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster > java.lang.RuntimeException: Couldn't deploy Yarn session cluster at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511) > Caused by: org.apache.flink.configuration.IllegalConfigurationException: The > number of virtual cores per node were configured with 1 but Yarn only has -1 > virtual cores available. Please note that the number of virtual cores is set > to the number of task slots by default unless configured in the Flink config > with 'yarn.containers.vcores.' at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn
[ https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357903#comment-16357903 ] Karrtik Iyer commented on FLINK-8615: - Hi [~kkrugler], Apologies, I was not aware of the Flink User list, can you please share the link of that forum where in I can post my question, and then close this issue? > Configuring Apache Flink Local Set up with Pseudo distributed Yarn > -- > > Key: FLINK-8615 > URL: https://issues.apache.org/jira/browse/FLINK-8615 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0 >Reporter: Karrtik Iyer >Priority: Major > > I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed > mode, following this blog: [Hadoop In Pseudo Distributed > Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html] > I have been able to successfully start hdfs and yarn. And also able to > submit Map reduce jobs. > After that I have download Apache Flink 1.4 from > [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz]. > Now I am trying to set up Flink on the above Yarn cluster by following the > steps > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html]. > When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting > below error which I am unable to resolve. Can someone please advise and help > me with the same? > > {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster > java.lang.RuntimeException: Couldn't deploy Yarn session cluster at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511) > Caused by: org.apache.flink.configuration.IllegalConfigurationException: The > number of virtual cores per node were configured with 1 but Yarn only has -1 > virtual cores available. Please note that the number of virtual cores is set > to the number of task slots by default unless configured in the Flink config > with 'yarn.containers.vcores.' at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8617) Fix code generation bug while accessing Map type
[ https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357843#comment-16357843 ] ASF GitHub Bot commented on FLINK-8617: --- GitHub user Xpray opened a pull request: https://github.com/apache/flink/pull/5438 [FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing … ## What is the purpose of the change fix code generation error in MapGet ## Brief change log handle nullTerm of MapGet GeneratedExpression correctly. ## Verifying this change *(Please pick either of the following options)* This change added tests and can be verified as follows: ``` run org.apache.flink.table.runtime.batch.sql.CalcITCase testMapGet() run org.apache.flink.table.runtime.batch.table.CalcITCase testMapget() run org.apache.flink.table.runtime.stream.table.CalcITCase testMapGet() run org.apache.flink.table.runtime.stream.sql.SqlITCase testMapGet() ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xpray/flink FLINK-8617 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5438.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5438 commit d618aac64156a9fd86c8367d6594e593521a8d8b Author: XprayDate: 2018-02-09T02:17:16Z [FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing Map type > Fix code generation bug while accessing Map type > > > Key: FLINK-8617 > URL: https://issues.apache.org/jira/browse/FLINK-8617 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > > There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}. > And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} > and {code}ScalarOperators.generateIsNotNull{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5438: [FLINK-8617][TableAPI & SQL] Fix code generation b...
GitHub user Xpray opened a pull request: https://github.com/apache/flink/pull/5438 [FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing ⦠## What is the purpose of the change fix code generation error in MapGet ## Brief change log handle nullTerm of MapGet GeneratedExpression correctly. ## Verifying this change *(Please pick either of the following options)* This change added tests and can be verified as follows: ``` run org.apache.flink.table.runtime.batch.sql.CalcITCase testMapGet() run org.apache.flink.table.runtime.batch.table.CalcITCase testMapget() run org.apache.flink.table.runtime.stream.table.CalcITCase testMapGet() run org.apache.flink.table.runtime.stream.sql.SqlITCase testMapGet() ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xpray/flink FLINK-8617 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5438.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5438 commit d618aac64156a9fd86c8367d6594e593521a8d8b Author: XprayDate: 2018-02-09T02:17:16Z [FLINK-8617][TableAPI & SQL] Fix code generation bug while accessing Map type ---
[jira] [Created] (FLINK-8617) Fix code generation bug while accessing Map type
Ruidong Li created FLINK-8617: - Summary: Fix code generation bug while accessing Map type Key: FLINK-8617 URL: https://issues.apache.org/jira/browse/FLINK-8617 Project: Flink Issue Type: Bug Components: Table API SQL Reporter: Ruidong Li Assignee: Ruidong Li There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}. And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} and {code}ScalarOperators.generateIsNotNull{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cliff Resnick updated FLINK-8616: - Summary: Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException (was: Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException) > Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator > masks ClassCastException > - > > Key: FLINK-8616 > URL: https://issues.apache.org/jira/browse/FLINK-8616 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Cliff Resnick >Priority: Major > > There is an attempt to enrich the exception with outputTag#getId, but > outputTag is null, and a NullPointerException is thrown. Looking at the > attempted message enrichment the code seems to assume a ClassCastException > can only stem from SideOutput type mismatches. This may have been the norm > before, but changes to classloader delegation in 1.4 have given rise to > multiple ClassLoader conflicts (at least for us), and they all seem to end up > here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cliff Resnick updated FLINK-8616: - Summary: Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException (was: Missing null check in OperatorChain#pushToOperator masks ClassCastException) > Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks > ClassCastException > -- > > Key: FLINK-8616 > URL: https://issues.apache.org/jira/browse/FLINK-8616 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Cliff Resnick >Priority: Major > > There is an attempt to enrich the exception with outputTag#getId, but > outputTag is null, and a NullPointerException is thrown. Looking at the > attempted message enrichment the code seems to assume a ClassCastException > can only stem from SideOutput type mismatches. This may have been the norm > before, but changes to classloader delegation in 1.4 have given rise to > multiple ClassLoader conflicts (at least for us), and they all seem to end up > here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8616) Missing null check in OperatorChain#pushToOperator masks ClassCastException
Cliff Resnick created FLINK-8616: Summary: Missing null check in OperatorChain#pushToOperator masks ClassCastException Key: FLINK-8616 URL: https://issues.apache.org/jira/browse/FLINK-8616 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.4.0 Reporter: Cliff Resnick There is an attempt to enrich the exception with outputTag#getId, but outputTag is null, and a NullPointerException is thrown. Looking at the attempted message enrichment the code seems to assume a ClassCastException can only stem from SideOutput type mismatches. This may have been the norm before, but changes to classloader delegation in 1.4 have given rise to multiple ClassLoader conflicts (at least for us), and they all seem to end up here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357640#comment-16357640 ] Thomas Weise commented on FLINK-5697: - Since shards are immutable wrt their hash key range and records cannot move between shards, we should be able to use the parent shard IDs and the last read sequence to find when a newly discovered shard can be read from. Child shards don't need to be assigned to the same subtask, in which case we would need a way to know the last read offset from the parent shard from a different subtask for comparison with EndingSequenceNumber. Is it possible to retrieve the last checkpointed offsets from other subtasks outside of restore to perform such check? (It would still imply that consumption from a new child shard cannot start until the parent was checkpointed and therefore add latency, but would provide the ordering guarantee we are looking for?) > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357562#comment-16357562 ] ASF GitHub Bot commented on FLINK-8571: --- Github user mdaxini commented on a diff in the pull request: https://github.com/apache/flink/pull/5424#discussion_r167064838 --- Diff: docs/dev/api_concepts.md --- @@ -896,3 +896,54 @@ result type ```R``` for the final result. E.g. for a histogram, ```V``` is a num {% top %} +## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed stream --- End diff -- +1 > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5424: FLINK-8571] [DataStream] Introduce utility functio...
Github user mdaxini commented on a diff in the pull request: https://github.com/apache/flink/pull/5424#discussion_r167064838 --- Diff: docs/dev/api_concepts.md --- @@ -896,3 +896,54 @@ result type ```R``` for the final result. E.g. for a histogram, ```V``` is a num {% top %} +## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed stream --- End diff -- +1 ---
[jira] [Commented] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn
[ https://issues.apache.org/jira/browse/FLINK-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357418#comment-16357418 ] Ken Krugler commented on FLINK-8615: Hi Karrtik - for a support/configuration question like this, please post to the Flink user list first, versus opening a bug in Jira. It would be great if you could close this issue, thanks. > Configuring Apache Flink Local Set up with Pseudo distributed Yarn > -- > > Key: FLINK-8615 > URL: https://issues.apache.org/jira/browse/FLINK-8615 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0 >Reporter: Karrtik Iyer >Priority: Major > > I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed > mode, following this blog: [Hadoop In Pseudo Distributed > Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html] > I have been able to successfully start hdfs and yarn. And also able to > submit Map reduce jobs. > After that I have download Apache Flink 1.4 from > [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz]. > Now I am trying to set up Flink on the above Yarn cluster by following the > steps > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html]. > When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting > below error which I am unable to resolve. Can someone please advise and help > me with the same? > > {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster > java.lang.RuntimeException: Couldn't deploy Yarn session cluster at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511) > Caused by: org.apache.flink.configuration.IllegalConfigurationException: The > number of virtual cores per node were configured with 1 but Yarn only has -1 > virtual cores available. Please note that the number of virtual cores is set > to the number of task slots by default unless configured in the Flink config > with 'yarn.containers.vcores.' at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8410) Kafka consumer's commitedOffsets gauge metric is prematurely set
[ https://issues.apache.org/jira/browse/FLINK-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8410: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Kafka consumer's commitedOffsets gauge metric is prematurely set > > > Key: FLINK-8410 > URL: https://issues.apache.org/jira/browse/FLINK-8410 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Metrics >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.3.3, 1.5.0, 1.4.2 > > > The {{committedOffset}} metric gauge value is set too early. It is set here: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L236 > This sets the committed offset before the actual commit happens, which varies > depending on whether the commit mode is auto periodically, or committed on > checkpoints. Moreover, in the 0.9+ consumers, the {{KafkaConsumerThread}} may > choose to supersede some commit attempts if the commit takes longer than the > commit interval. > While the committed offset back to Kafka is not a critical value used by the > consumer, it will be best to have more accurate values as a Flink-shipped > metric. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8416) Kinesis consumer doc examples should demonstrate preferred default credentials provider
[ https://issues.apache.org/jira/browse/FLINK-8416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8416: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Kinesis consumer doc examples should demonstrate preferred default > credentials provider > --- > > Key: FLINK-8416 > URL: https://issues.apache.org/jira/browse/FLINK-8416 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.3.3, 1.5.0, 1.4.2 > > > The Kinesis consumer docs > [here](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html#kinesis-consumer) > demonstrate providing credentials by explicitly supplying the AWS Access ID > and Key. > The always preferred approach for AWS, unless running locally, is to > automatically fetch the shipped credentials from the AWS environment. > That is actually the default behaviour of the Kinesis consumer, so the docs > should demonstrate that more clearly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8247) Support Hadoop-free variant of Flink on Mesos
[ https://issues.apache.org/jira/browse/FLINK-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357305#comment-16357305 ] Tzu-Li (Gordon) Tai commented on FLINK-8247: Moving this to 1.4.2, since on the mailing lists the community agreed to move forward with what we have already for 1.4.1. Please reopen and let me know if you disagree. > Support Hadoop-free variant of Flink on Mesos > - > > Key: FLINK-8247 > URL: https://issues.apache.org/jira/browse/FLINK-8247 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > Fix For: 1.5.0, 1.4.2 > > > In Hadoop-free mode, Hadoop isn't on the classpath. The Mesos job manager > normally uses the Hadoop UserGroupInformation class to overlay a user context > (`HADOOP_USER_NAME`) for the task managers. > Detect the absence of Hadoop and skip over the `HadoopUserOverlay`, similar > to the logic in `HadoopModuleFactory`.This may require the introduction > of an overlay factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8247) Support Hadoop-free variant of Flink on Mesos
[ https://issues.apache.org/jira/browse/FLINK-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8247: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Support Hadoop-free variant of Flink on Mesos > - > > Key: FLINK-8247 > URL: https://issues.apache.org/jira/browse/FLINK-8247 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > Fix For: 1.5.0, 1.4.2 > > > In Hadoop-free mode, Hadoop isn't on the classpath. The Mesos job manager > normally uses the Hadoop UserGroupInformation class to overlay a user context > (`HADOOP_USER_NAME`) for the task managers. > Detect the absence of Hadoop and skip over the `HadoopUserOverlay`, similar > to the logic in `HadoopModuleFactory`.This may require the introduction > of an overlay factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8418) Kafka08ITCase.testStartFromLatestOffsets() times out on Travis
[ https://issues.apache.org/jira/browse/FLINK-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8418: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Kafka08ITCase.testStartFromLatestOffsets() times out on Travis > -- > > Key: FLINK-8418 > URL: https://issues.apache.org/jira/browse/FLINK-8418 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.0, 1.4.2 > > > Instance: https://travis-ci.org/kl0u/flink/builds/327733085 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8451) CaseClassSerializer is not backwards compatible in 1.4
[ https://issues.apache.org/jira/browse/FLINK-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357300#comment-16357300 ] Timo Walther commented on FLINK-8451: - [~tzulitai] yes, 1.4.2 is fine for me. > CaseClassSerializer is not backwards compatible in 1.4 > -- > > Key: FLINK-8451 > URL: https://issues.apache.org/jira/browse/FLINK-8451 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > > There seems to be problems with the updated Scala version and the > CaseClassSerializer that make it impossible to restore from a Flink 1.3 > savepoint. > http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCACk7FThV5itjSj_1fG9oaWS86z8WTKWs7abHvok6FnHzq9XT-A%40mail.gmail.com%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3C7CABB00B-D52F-4878-B04F-201415CEB658%40mediamath.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357299#comment-16357299 ] Tzu-Li (Gordon) Tai commented on FLINK-8308: [~uce] what is the status of this? Has this been merged for 1.4.1 yet? > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8020) Deadlock found in Async I/O operator
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8020: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Deadlock found in Async I/O operator > > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Critical > Fix For: 1.5.0, 1.4.2 > > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8020) Deadlock found in Async I/O operator
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357296#comment-16357296 ] Tzu-Li (Gordon) Tai commented on FLINK-8020: Moving this to 1.4.2, since on the mailing lists the community agreed to move forward with what we have already for 1.4.1. Please reopen and let me know if you disagree. > Deadlock found in Async I/O operator > > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Critical > Fix For: 1.5.0, 1.4.2 > > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8500: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357295#comment-16357295 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: Moving this to 1.4.2, since on the mailing lists the community agreed to move forward with what we have already for 1.4.1. Please reopen and let me know if you disagree. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357294#comment-16357294 ] Tzu-Li (Gordon) Tai commented on FLINK-8487: [~aljoscha] can you confirm? > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8451) CaseClassSerializer is not backwards compatible in 1.4
[ https://issues.apache.org/jira/browse/FLINK-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357291#comment-16357291 ] Tzu-Li (Gordon) Tai commented on FLINK-8451: [~shashank734] the stack trace you posted seems to direct to FLINK-7760. As per-discussion in the mailing lists, we will not continue to block 1.4.1 for issues that are missing a pull request. Moving this to 1.4.2. [~twalthr] let me know if you disagree, thanks. > CaseClassSerializer is not backwards compatible in 1.4 > -- > > Key: FLINK-8451 > URL: https://issues.apache.org/jira/browse/FLINK-8451 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > > There seems to be problems with the updated Scala version and the > CaseClassSerializer that make it impossible to restore from a Flink 1.3 > savepoint. > http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCACk7FThV5itjSj_1fG9oaWS86z8WTKWs7abHvok6FnHzq9XT-A%40mail.gmail.com%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3C7CABB00B-D52F-4878-B04F-201415CEB658%40mediamath.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8451) CaseClassSerializer is not backwards compatible in 1.4
[ https://issues.apache.org/jira/browse/FLINK-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8451: --- Fix Version/s: (was: 1.4.1) 1.4.2 > CaseClassSerializer is not backwards compatible in 1.4 > -- > > Key: FLINK-8451 > URL: https://issues.apache.org/jira/browse/FLINK-8451 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > > There seems to be problems with the updated Scala version and the > CaseClassSerializer that make it impossible to restore from a Flink 1.3 > savepoint. > http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCACk7FThV5itjSj_1fG9oaWS86z8WTKWs7abHvok6FnHzq9XT-A%40mail.gmail.com%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3C7CABB00B-D52F-4878-B04F-201415CEB658%40mediamath.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8615) Configuring Apache Flink Local Set up with Pseudo distributed Yarn
Karrtik Iyer created FLINK-8615: --- Summary: Configuring Apache Flink Local Set up with Pseudo distributed Yarn Key: FLINK-8615 URL: https://issues.apache.org/jira/browse/FLINK-8615 Project: Flink Issue Type: Bug Affects Versions: 1.4.0 Environment: Mac Os, Hadoop 2.8.3 and Flink 1.4.0 Reporter: Karrtik Iyer I have set up HADOOP 2.8.3 and YARN on my Mac machine in Pseudo distributed mode, following this blog: [Hadoop In Pseudo Distributed Mode|http://zhongyaonan.com/hadoop-tutorial/setting-up-hadoop-2-6-on-mac-osx-yosemite.html] I have been able to successfully start hdfs and yarn. And also able to submit Map reduce jobs. After that I have download Apache Flink 1.4 from [here|http://redrockdigimark.com/apachemirror/flink/flink-1.4.0/flink-1.4.0-bin-hadoop28-scala_2.11.tgz]. Now I am trying to set up Flink on the above Yarn cluster by following the steps [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html]. When I try to run /bin/yarn-session.sh -n 4 -jm 1024 -tm 4096, I am getting below error which I am unable to resolve. Can someone please advise and help me with the same? {{Error while deploying YARN cluster: Couldn't deploy Yarn session cluster java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:372) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511) Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of virtual cores per node were configured with 1 but Yarn only has -1 virtual cores available. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.' at org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:265) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:415) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop
[ https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8439: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop > > > Key: FLINK-8439 > URL: https://issues.apache.org/jira/browse/FLINK-8439 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Dyana Rose >Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > > This came up when using the s3 for the file system backend and running under > ECS. > With no credentials in the container, hadoop-aws will default to EC2 instance > level credentials when accessing S3. However when running under ECS, you will > generally want to default to the task definition's IAM role. > In this case you need to set the hadoop property > {code:java} > fs.s3a.aws.credentials.provider{code} > to a fully qualified class name(s). see [hadoop-aws > docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md] > This works as expected when you add this setting to flink-conf.yaml but there > is a further 'gotcha.' Because the AWS sdk is shaded, the actual full class > name for, in this case, the ContainerCredentialsProvider is > {code:java} > org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code} > > meaning the full setting is: > {code:java} > fs.s3a.aws.credentials.provider: > org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code} > If you instead set it to the unshaded class name you will see a very > confusing error stating that the ContainerCredentialsProvider doesn't > implement AWSCredentialsProvider (which it most certainly does.) > Adding this information (how to specify alternate Credential Providers, and > the name space gotcha) to the [AWS deployment > docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html] > would be useful to anyone else using S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes
[ https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8270. Resolution: Fixed > TaskManagers do not use correct local path for shipped Keytab files in Yarn > deployment modes > > > Key: FLINK-8270 > URL: https://issues.apache.org/jira/browse/FLINK-8270 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Reported on ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html > This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are > again not using the correct local paths for shipped Keytab files. > The cause was accidental due to this change: > https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e. > Things to consider: > 1) The above accidental breaking change was actually targeting a minor > refactor on the "integration test scenario" code block in > {{YarnTaskManagerRunner}}. It would be best if we can remove that test case > code block from the main code. > 2) Unit test coverage is apparently not enough. As this incidence shows, any > slight changes can cause this issue to easily resurface again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes
[ https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357288#comment-16357288 ] Tzu-Li (Gordon) Tai commented on FLINK-8270: Fixed by FLINK-8275. Closing this issue now. > TaskManagers do not use correct local path for shipped Keytab files in Yarn > deployment modes > > > Key: FLINK-8270 > URL: https://issues.apache.org/jira/browse/FLINK-8270 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Reported on ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html > This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are > again not using the correct local paths for shipped Keytab files. > The cause was accidental due to this change: > https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e. > Things to consider: > 1) The above accidental breaking change was actually targeting a minor > refactor on the "integration test scenario" code block in > {{YarnTaskManagerRunner}}. It would be best if we can remove that test case > code block from the main code. > 2) Unit test coverage is apparently not enough. As this incidence shows, any > slight changes can cause this issue to easily resurface again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.
[ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357284#comment-16357284 ] Tzu-Li (Gordon) Tai commented on FLINK-7756: Marking this as resolved due to the fix in FLINK-7760. [~shashank734] if you disagree and still have issues with this, please let us know and reopen the ticket. > RocksDB state backend Checkpointing (Async and Incremental) is not working > with CEP. > - > > Key: FLINK-7756 > URL: https://issues.apache.org/jira/browse/FLINK-7756 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing, Streaming >Affects Versions: 1.4.0, 1.3.2 > Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > When i try to use RocksDBStateBackend on my staging cluster (which is using > HDFS as file system) it crashes. But When i use FsStateBackend on staging > (which is using HDFS as file system) it is working fine. > On local with local file system it's working fine in both cases. > Please check attached logs. I have around 20-25 tasks in my app. > {code:java} > 2017-09-29 14:21:31,639 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=0). > 2017-09-29 14:21:31,640 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,020 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=1). > 2017-09-29 14:21:32,022 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil > - Found Netty's native epoll transport in the classpath, using > it > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (1/2) > (b879f192c4e8aae6671cdafb3a24c00a). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (2/2) > (1ea5aef6ccc7031edc6b37da2912d90b). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (2/2) > (4bac8e764c67520d418a4c755be23d4d). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched > from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Co-Flat Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Co-Flat Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at >
[jira] [Resolved] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.
[ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-7756. Resolution: Fixed Assignee: Aljoscha Krettek > RocksDB state backend Checkpointing (Async and Incremental) is not working > with CEP. > - > > Key: FLINK-7756 > URL: https://issues.apache.org/jira/browse/FLINK-7756 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing, Streaming >Affects Versions: 1.4.0, 1.3.2 > Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend >Reporter: Shashank Agarwal >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > When i try to use RocksDBStateBackend on my staging cluster (which is using > HDFS as file system) it crashes. But When i use FsStateBackend on staging > (which is using HDFS as file system) it is working fine. > On local with local file system it's working fine in both cases. > Please check attached logs. I have around 20-25 tasks in my app. > {code:java} > 2017-09-29 14:21:31,639 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=0). > 2017-09-29 14:21:31,640 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,020 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=1). > 2017-09-29 14:21:32,022 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil > - Found Netty's native epoll transport in the classpath, using > it > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (1/2) > (b879f192c4e8aae6671cdafb3a24c00a). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (2/2) > (1ea5aef6ccc7031edc6b37da2912d90b). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (2/2) > (4bac8e764c67520d418a4c755be23d4d). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched > from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Co-Flat Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Co-Flat Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at >
[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-5372: --- Fix Version/s: (was: 1.4.1) 1.4.2 > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.2 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:40 PM: [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? Since AFAICT this is a test instability, I will change the fix version for this to be 1.4.2 for now. If you disagree, please let me know. was (Author: tzulitai): [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? I will change the fix version for this to be 1.4.2 for now. If you disagree, please let me know. > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.2 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:39 PM: [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? I will change the fix version for this to be 1.4.2 for now. If you disagree, please let me know. was (Author: tzulitai): [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.
[ https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-6321. Resolution: Fixed > RocksDB state backend Checkpointing is not working with KeyedCEP. > - > > Key: FLINK-6321 > URL: https://issues.apache.org/jira/browse/FLINK-6321 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.2.0 > Environment: yarn-cluster, RocksDB State backend, Checkpointing every > 1000 ms >Reporter: Shashank Agarwal >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Checkpointing is not working with RocksDBStateBackend when using CEP. It's > working fine with FsStateBackend and MemoryStateBackend. Application failing > every-time. > {code} > 04/18/2017 21:53:20 Job execution switched to status FAILING. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 46 for operator KeyedCEPPatternOperator -> Map (1/4).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 46 for > operator KeyedCEPPatternOperator -> Map (1/4). > ... 6 more > Caused by: java.util.concurrent.CancellationException > at java.util.concurrent.FutureTask.report(FutureTask.java:121) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915) > ... 5 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.
[ https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357278#comment-16357278 ] Tzu-Li (Gordon) Tai commented on FLINK-6321: I am marking this as resolved now, due to the fix for FLINK-7760. [~shashank734] if you object and think there is still an issue, please reopen. > RocksDB state backend Checkpointing is not working with KeyedCEP. > - > > Key: FLINK-6321 > URL: https://issues.apache.org/jira/browse/FLINK-6321 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.2.0 > Environment: yarn-cluster, RocksDB State backend, Checkpointing every > 1000 ms >Reporter: Shashank Agarwal >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Checkpointing is not working with RocksDBStateBackend when using CEP. It's > working fine with FsStateBackend and MemoryStateBackend. Application failing > every-time. > {code} > 04/18/2017 21:53:20 Job execution switched to status FAILING. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 46 for operator KeyedCEPPatternOperator -> Map (1/4).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 46 for > operator KeyedCEPPatternOperator -> Map (1/4). > ... 6 more > Caused by: java.util.concurrent.CancellationException > at java.util.concurrent.FutureTask.report(FutureTask.java:121) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915) > ... 5 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276 ] Tzu-Li (Gordon) Tai commented on FLINK-5372: [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8614) Enable Flip-6 per default
[ https://issues.apache.org/jira/browse/FLINK-8614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357253#comment-16357253 ] ASF GitHub Bot commented on FLINK-8614: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5437 [FLINK-8614] [flip6] Activate Flip-6 mode per default ## What is the purpose of the change This commit enables the Flip-6 mode per default. Additionally, it disables some of the Yarn tests which no longer apply to Flip-6 (tests which wait for a number of started TM container without a job submission). This PR is based on #5436, #5432 and #5388. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink flip6Default Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5437.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5437 commit 56ed240dd2c3e8f2775208ae349f137acb53bd34 Author: Till RohrmannDate: 2018-02-07T16:00:40Z [FLINK-8613] [flip6] [yarn] Return excess containers Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. commit f9d9fd0d4c22defd4f839b7078596e6e440d26c1 Author: Till Rohrmann Date: 2018-01-30T14:15:49Z [FLINK-8529] [flip6] Let Yarn entry points use APPLICATION_MASTER_PORT Let all Yarn entry points use the YarnConfigOptions.APPLICATION_MASTER_PORT option to specify the valid port range for the common RpcService. commit acc942e5237bcf8cc482c564c061d741befbff53 Author: Till Rohrmann Date: 2018-02-06T15:47:28Z [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend This commit allows to deploy detached job mode clusters via the CliFrontend. In order to do that, it first extracts the JobGraph from the PackagedProgram and then uses the ClusterDescriptor to deploy the job mode cluster. commit 52e8e74a6c977c68be0bdb62017bcbac0bb5ed2f Author: Till Rohrmann Date: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit d0ab48e090f142bf5287528b6d0198b702e169f0 Author: Till Rohrmann Date: 2018-02-07T17:21:06Z [hotfix] [yarn] Write number of slots to configuration commit 1d38657ab86b6f7cff7b9b6c878ee4573aedff71 Author: Till Rohrmann Date: 2018-02-07T17:58:32Z [hotfix] [yarn] Remove unnecessary TaskManager configuration generation commit 76b6ce0ca587377b14fbd6eda48f52d4da13f09e Author: Till Rohrmann Date: 2018-02-08T09:27:27Z [hotfix] Only log retrying exception on debug in RetryingRegistration commit 00de9bf451f5cb56e96938a7cf0dd9c5e38ea595 Author: Till Rohrmann Date: 2018-01-30T08:22:03Z [FLINK-8614] [flip6] Activate Flip-6 mode per default This commit enables the Flip-6 mode per default. Additionally, it disables some of the Yarn tests which no longer apply to Flip-6 (tests which wait for a number of started TM container without a job submission). > Enable Flip-6 per default > - > > Key: FLINK-8614 > URL: https://issues.apache.org/jira/browse/FLINK-8614 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: flip-6 > Fix For: 1.5.0 > > > After
[GitHub] flink pull request #5437: [FLINK-8614] [flip6] Activate Flip-6 mode per defa...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5437 [FLINK-8614] [flip6] Activate Flip-6 mode per default ## What is the purpose of the change This commit enables the Flip-6 mode per default. Additionally, it disables some of the Yarn tests which no longer apply to Flip-6 (tests which wait for a number of started TM container without a job submission). This PR is based on #5436, #5432 and #5388. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink flip6Default Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5437.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5437 commit 56ed240dd2c3e8f2775208ae349f137acb53bd34 Author: Till RohrmannDate: 2018-02-07T16:00:40Z [FLINK-8613] [flip6] [yarn] Return excess containers Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. commit f9d9fd0d4c22defd4f839b7078596e6e440d26c1 Author: Till Rohrmann Date: 2018-01-30T14:15:49Z [FLINK-8529] [flip6] Let Yarn entry points use APPLICATION_MASTER_PORT Let all Yarn entry points use the YarnConfigOptions.APPLICATION_MASTER_PORT option to specify the valid port range for the common RpcService. commit acc942e5237bcf8cc482c564c061d741befbff53 Author: Till Rohrmann Date: 2018-02-06T15:47:28Z [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend This commit allows to deploy detached job mode clusters via the CliFrontend. In order to do that, it first extracts the JobGraph from the PackagedProgram and then uses the ClusterDescriptor to deploy the job mode cluster. commit 52e8e74a6c977c68be0bdb62017bcbac0bb5ed2f Author: Till Rohrmann Date: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit d0ab48e090f142bf5287528b6d0198b702e169f0 Author: Till Rohrmann Date: 2018-02-07T17:21:06Z [hotfix] [yarn] Write number of slots to configuration commit 1d38657ab86b6f7cff7b9b6c878ee4573aedff71 Author: Till Rohrmann Date: 2018-02-07T17:58:32Z [hotfix] [yarn] Remove unnecessary TaskManager configuration generation commit 76b6ce0ca587377b14fbd6eda48f52d4da13f09e Author: Till Rohrmann Date: 2018-02-08T09:27:27Z [hotfix] Only log retrying exception on debug in RetryingRegistration commit 00de9bf451f5cb56e96938a7cf0dd9c5e38ea595 Author: Till Rohrmann Date: 2018-01-30T08:22:03Z [FLINK-8614] [flip6] Activate Flip-6 mode per default This commit enables the Flip-6 mode per default. Additionally, it disables some of the Yarn tests which no longer apply to Flip-6 (tests which wait for a number of started TM container without a job submission). ---
[jira] [Created] (FLINK-8614) Enable Flip-6 per default
Till Rohrmann created FLINK-8614: Summary: Enable Flip-6 per default Key: FLINK-8614 URL: https://issues.apache.org/jira/browse/FLINK-8614 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 After adding the FLINK-8471, the next step is to enable Flip-6 per default by setting the configuration switch to {{flip6}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357243#comment-16357243 ] ASF GitHub Bot commented on FLINK-8613: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5436 [FLINK-8613] [flip6] [yarn] Return excess containers ## What is the purpose of the change Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5436.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5436 commit 56ed240dd2c3e8f2775208ae349f137acb53bd34 Author: Till RohrmannDate: 2018-02-07T16:00:40Z [FLINK-8613] [flip6] [yarn] Return excess containers Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} should return excess containers which the Yarn > RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5436 [FLINK-8613] [flip6] [yarn] Return excess containers ## What is the purpose of the change Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5436.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5436 commit 56ed240dd2c3e8f2775208ae349f137acb53bd34 Author: Till RohrmannDate: 2018-02-07T16:00:40Z [FLINK-8613] [flip6] [yarn] Return excess containers Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. ---
[jira] [Created] (FLINK-8613) Return excess container in YarnResourceManager
Till Rohrmann created FLINK-8613: Summary: Return excess container in YarnResourceManager Key: FLINK-8613 URL: https://issues.apache.org/jira/browse/FLINK-8613 Project: Flink Issue Type: Improvement Components: Distributed Coordination, YARN Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 The {{YarnResourceManager}} should return excess containers which the Yarn RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8612) Add non-detached job mode
[ https://issues.apache.org/jira/browse/FLINK-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357229#comment-16357229 ] ASF GitHub Bot commented on FLINK-8612: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5435 [FLINK-8612] [flip6] Enable non-detached job mode ## What is the purpose of the change The non-detached job mode waits until has served the JobResult of a completed job at least once before it terminates. This PR is based on #5434 and #5433. ## Verifying this change - Added `MiniDispatcherTest#testJobResultRetrieval` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addNormalJobMode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5435.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5435 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d Author: Till Rohrmann Date: 2018-02-07T13:12:54Z [hotfix] Fix checkstyle violations in JobExecutionResult commit 3c414687bc73229f045d861d57da3843ca470469 Author: Till Rohrmann Date: 2018-02-07T13:56:45Z [FLINK-8611] [flip6] Add result future to JobManagerRunner This commit adds a CompletableFuture to the JobManagerRunner. This future will be completed once the job has reached a globally terminal state. commit a9af6165cdee7d8e8dcf503d160ad5f1330a0076 Author: Till Rohrmann Date: 2018-02-07T10:09:34Z [FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway The JobMaster no longer needs to implement the RestfulGateway. Therefore, it is removed by this commit. commit bcd803845976147d5c13b0b92f196346d6412c58 Author: Till Rohrmann Date: 2018-02-08T13:51:25Z [FLINK-8612] [flip6] Enable non-detached job mode The non-detached job mode waits until has served the JobResult of a completed job at least once before it terminates. > Add non-detached job mode > - > > Key: FLINK-8612 > URL: https://issues.apache.org/jira/browse/FLINK-8612 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > In order to support the non-detached job mode, the {{MiniDispatcher}} has to > wait until it has served the {{JobResult}} of a completed job at least once > before it terminates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5435: [FLINK-8612] [flip6] Enable non-detached job mode
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5435 [FLINK-8612] [flip6] Enable non-detached job mode ## What is the purpose of the change The non-detached job mode waits until has served the JobResult of a completed job at least once before it terminates. This PR is based on #5434 and #5433. ## Verifying this change - Added `MiniDispatcherTest#testJobResultRetrieval` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addNormalJobMode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5435.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5435 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d Author: Till Rohrmann Date: 2018-02-07T13:12:54Z [hotfix] Fix checkstyle violations in JobExecutionResult commit 3c414687bc73229f045d861d57da3843ca470469 Author: Till Rohrmann Date: 2018-02-07T13:56:45Z [FLINK-8611] [flip6] Add result future to JobManagerRunner This commit adds a CompletableFuture to the JobManagerRunner. This future will be completed once the job has reached a globally terminal state. commit a9af6165cdee7d8e8dcf503d160ad5f1330a0076 Author: Till Rohrmann Date: 2018-02-07T10:09:34Z [FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway The JobMaster no longer needs to implement the RestfulGateway. Therefore, it is removed by this commit. commit bcd803845976147d5c13b0b92f196346d6412c58 Author: Till Rohrmann Date: 2018-02-08T13:51:25Z [FLINK-8612] [flip6] Enable non-detached job mode The non-detached job mode waits until has served the JobResult of a completed job at least once before it terminates. ---
[jira] [Created] (FLINK-8612) Add non-detached job mode
Till Rohrmann created FLINK-8612: Summary: Add non-detached job mode Key: FLINK-8612 URL: https://issues.apache.org/jira/browse/FLINK-8612 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 In order to support the non-detached job mode, the {{MiniDispatcher}} has to wait until it has served the {{JobResult}} of a completed job at least once before it terminates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8611) Add result future to JobManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357185#comment-16357185 ] ASF GitHub Bot commented on FLINK-8611: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5434 [FLINK-8611] [flip6] Add result future to JobManagerRunner ## What is the purpose of the change This commit adds a CompletableFuture to the JobManagerRunner. This future will be completed once the job has reached a globally terminal state. This PR is based on #5431. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink jobManagerRunnerFuture Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5434.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5434 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d Author: Till Rohrmann Date: 2018-02-07T13:12:54Z [hotfix] Fix checkstyle violations in JobExecutionResult commit 3c414687bc73229f045d861d57da3843ca470469 Author: Till Rohrmann Date: 2018-02-07T13:56:45Z [FLINK-8611] [flip6] Add result future to JobManagerRunner This commit adds a CompletableFuture to the JobManagerRunner. This future will be completed once the job has reached a globally terminal state. > Add result future to JobManagerRunner > - > > Key: FLINK-8611 > URL: https://issues.apache.org/jira/browse/FLINK-8611 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Adding a {{CompletableFuture}} result future to the > {{JobManagerRunner}} will allow to return a {{JobResult}} future for an still > running job. This is helpful for the implementation of a non-detached job > mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5434: [FLINK-8611] [flip6] Add result future to JobManag...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5434 [FLINK-8611] [flip6] Add result future to JobManagerRunner ## What is the purpose of the change This commit adds a CompletableFuture to the JobManagerRunner. This future will be completed once the job has reached a globally terminal state. This PR is based on #5431. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink jobManagerRunnerFuture Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5434.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5434 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit d9af78da8eec870eb8fcce140ce13b2c06ca4e5d Author: Till Rohrmann Date: 2018-02-07T13:12:54Z [hotfix] Fix checkstyle violations in JobExecutionResult commit 3c414687bc73229f045d861d57da3843ca470469 Author: Till Rohrmann Date: 2018-02-07T13:56:45Z [FLINK-8611] [flip6] Add result future to JobManagerRunner This commit adds a CompletableFuture to the JobManagerRunner. This future will be completed once the job has reached a globally terminal state. ---
[jira] [Created] (FLINK-8611) Add result future to JobManagerRunner
Till Rohrmann created FLINK-8611: Summary: Add result future to JobManagerRunner Key: FLINK-8611 URL: https://issues.apache.org/jira/browse/FLINK-8611 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 Adding a {{CompletableFuture}} result future to the {{JobManagerRunner}} will allow to return a {{JobResult}} future for an still running job. This is helpful for the implementation of a non-detached job mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8318. Resolution: Fixed Assignee: Nico Kruber > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... > > 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > Unnamed (3/10)
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357172#comment-16357172 ] Tzu-Li (Gordon) Tai commented on FLINK-8318: We have a fix now in `1.5.0` and `1.4.1` that shades away all ES dependencies. 1.4 - 33ebc85c281fcd4869fa743f88c2c71d339f9857 1.5 - 8395508b0401353ed07375e22882e7581d46ac0e Therefore, I think we can mark issue as resolved. Please object if you disagree, [~Jihyun Cho]. > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: >
[jira] [Resolved] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8362. Resolution: Fixed Fix Version/s: 1.4.1 1.5.0 > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0, 1.4.1 > > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357164#comment-16357164 ] Tzu-Li (Gordon) Tai commented on FLINK-8362: Merged. 1.4 - 33ebc85c281fcd4869fa743f88c2c71d339f9857 1.5 - 8395508b0401353ed07375e22882e7581d46ac0e > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0, 1.4.1 > > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357148#comment-16357148 ] ASF GitHub Bot commented on FLINK-8362: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5426 > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357149#comment-16357149 ] ASF GitHub Bot commented on FLINK-8362: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5243 > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5426: [FLINK-8362] [elasticsearch] Shade all ES connecto...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5426 ---
[GitHub] flink pull request #5243: [FLINK-8362][elasticsearch] shade all dependencies
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5243 ---
[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357135#comment-16357135 ] ASF GitHub Bot commented on FLINK-8362: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5426 Build passing locally, merging ... > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5426: [FLINK-8362] [elasticsearch] Shade all ES connector depen...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5426 Build passing locally, merging ... ---
[GitHub] flink pull request #5433: [FLINK-8610] [flip6] Remove RestfulGateway from Jo...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5433 [FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway ## What is the purpose of the change The JobMaster no longer needs to implement the RestfulGateway. Therefore, it is removed by this commit. This PR is based on #5431. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeRestfulGatewayFromJobMaster Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5433.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5433 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit 1da4fa2dab9c7e6ab6aecfe5c8512de217fcce9a Author: Till Rohrmann Date: 2018-02-07T10:09:34Z [FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway The JobMaster no longer needs to implement the RestfulGateway. Therefore, it is removed by this commit. ---
[jira] [Commented] (FLINK-8610) Remove RestfulGateway from JobMasterGateway
[ https://issues.apache.org/jira/browse/FLINK-8610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357130#comment-16357130 ] ASF GitHub Bot commented on FLINK-8610: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5433 [FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway ## What is the purpose of the change The JobMaster no longer needs to implement the RestfulGateway. Therefore, it is removed by this commit. This PR is based on #5431. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeRestfulGatewayFromJobMaster Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5433.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5433 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit 1da4fa2dab9c7e6ab6aecfe5c8512de217fcce9a Author: Till Rohrmann Date: 2018-02-07T10:09:34Z [FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway The JobMaster no longer needs to implement the RestfulGateway. Therefore, it is removed by this commit. > Remove RestfulGateway from JobMasterGateway > --- > > Key: FLINK-8610 > URL: https://issues.apache.org/jira/browse/FLINK-8610 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the > {{RestfulGateway}}. Therefore, we should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8610) Remove RestfulGateway from JobMasterGateway
Till Rohrmann created FLINK-8610: Summary: Remove RestfulGateway from JobMasterGateway Key: FLINK-8610 URL: https://issues.apache.org/jira/browse/FLINK-8610 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the {{RestfulGateway}}. Therefore, we should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8609) Add support to deploy detached job mode clusters
[ https://issues.apache.org/jira/browse/FLINK-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357116#comment-16357116 ] ASF GitHub Bot commented on FLINK-8609: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5432 [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend ## What is the purpose of the change This commit allows to deploy detached job mode clusters via the CliFrontend. In order to do that, it first extracts the JobGraph from the PackagedProgram and then uses the ClusterDescriptor to deploy the job mode cluster. This PR is based on #5431. ## Brief change log - Extract `JobGraph` from `PackagedProgram` in `CliFrontend` - Deploy job mode cluster if flip-6 is enabled ## Verifying this change - Tested manually ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink enableJobMode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5432.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5432 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit be2d9dfa515b1577f6d7a67b726d9e704281a1cc Author: Till Rohrmann Date: 2018-02-06T15:47:28Z [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend This commit allows to deploy detached job mode clusters via the CliFrontend. In order to do that, it first extracts the JobGraph from the PackagedProgram and then uses the ClusterDescriptor to deploy the job mode cluster. > Add support to deploy detached job mode clusters > > > Key: FLINK-8609 > URL: https://issues.apache.org/jira/browse/FLINK-8609 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy > detached job mode clusters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5432: [FLINK-8609] [flip6] Enable Flip-6 job mode in Cli...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5432 [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend ## What is the purpose of the change This commit allows to deploy detached job mode clusters via the CliFrontend. In order to do that, it first extracts the JobGraph from the PackagedProgram and then uses the ClusterDescriptor to deploy the job mode cluster. This PR is based on #5431. ## Brief change log - Extract `JobGraph` from `PackagedProgram` in `CliFrontend` - Deploy job mode cluster if flip-6 is enabled ## Verifying this change - Tested manually ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink enableJobMode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5432.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5432 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. commit be2d9dfa515b1577f6d7a67b726d9e704281a1cc Author: Till Rohrmann Date: 2018-02-06T15:47:28Z [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend This commit allows to deploy detached job mode clusters via the CliFrontend. In order to do that, it first extracts the JobGraph from the PackagedProgram and then uses the ClusterDescriptor to deploy the job mode cluster. ---
[jira] [Created] (FLINK-8609) Add support to deploy detached job mode clusters
Till Rohrmann created FLINK-8609: Summary: Add support to deploy detached job mode clusters Key: FLINK-8609 URL: https://issues.apache.org/jira/browse/FLINK-8609 Project: Flink Issue Type: New Feature Components: Client Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy detached job mode clusters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-6590) Integrate generated tables into documentation
[ https://issues.apache.org/jira/browse/FLINK-6590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-6590: - reopen to modify title > Integrate generated tables into documentation > - > > Key: FLINK-6590 > URL: https://issues.apache.org/jira/browse/FLINK-6590 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6590) Integrate generated tables into documentation
[ https://issues.apache.org/jira/browse/FLINK-6590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-6590. --- Resolution: Fixed > Integrate generated tables into documentation > - > > Key: FLINK-6590 > URL: https://issues.apache.org/jira/browse/FLINK-6590 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8530) Enable detached job submission for RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8530. Resolution: Won't Do > Enable detached job submission for RestClusterClient > > > Key: FLINK-8530 > URL: https://issues.apache.org/jira/browse/FLINK-8530 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} should also be able to submit jobs in detached > mode. In detached mode, we don't wait for the {{JobExecutionResult}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8608) Add MiniDispatcher for job mode
[ https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357086#comment-16357086 ] ASF GitHub Bot commented on FLINK-8608: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5431 [FLINK-8608] [flip6] Implement MiniDispatcher for job mode ## What is the purpose of the change The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. ## Brief change log - Introduce `SingleJobSubmittedJobGraphStore` for the `MiniDispatcher` - Refactored `JobClusterEntrypoint` and `SessionClusterEntrypoint` such that most of the cluster component logic moved to `ClusterEntrypoint` - Renamed `JobMasterRestEndpoint` into `MiniDispatcherRestEndpoint` - Initialize `MiniDispatcher` with single job retrieved by the `JobClusterEntrypoint` - Terminate `MiniDispatcher` after job completion if the its execution mode is `detached` ## Verifying this change - Added `MiniDispatcherTest` - The tests of `MiniDispatcher` are also valid for the `MiniDispatcher` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink implementMiniDispatcher Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5431.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5431 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. > Add MiniDispatcher for job mode > --- > > Key: FLINK-8608 > URL: https://issues.apache.org/jira/browse/FLINK-8608 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > In order to properly support the job mode, we need a {{MiniDispatcher}} which > is started with a pre initialized {{JobGraph}} and launches a single > {{JobManagerRunner}} with this job. Once the job is completed and if the > {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should > terminate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5431 [FLINK-8608] [flip6] Implement MiniDispatcher for job mode ## What is the purpose of the change The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. ## Brief change log - Introduce `SingleJobSubmittedJobGraphStore` for the `MiniDispatcher` - Refactored `JobClusterEntrypoint` and `SessionClusterEntrypoint` such that most of the cluster component logic moved to `ClusterEntrypoint` - Renamed `JobMasterRestEndpoint` into `MiniDispatcherRestEndpoint` - Initialize `MiniDispatcher` with single job retrieved by the `JobClusterEntrypoint` - Terminate `MiniDispatcher` after job completion if the its execution mode is `detached` ## Verifying this change - Added `MiniDispatcherTest` - The tests of `MiniDispatcher` are also valid for the `MiniDispatcher` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink implementMiniDispatcher Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5431.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5431 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till RohrmannDate: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. ---
[jira] [Created] (FLINK-8608) Add MiniDispatcher for job mode
Till Rohrmann created FLINK-8608: Summary: Add MiniDispatcher for job mode Key: FLINK-8608 URL: https://issues.apache.org/jira/browse/FLINK-8608 Project: Flink Issue Type: New Feature Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 In order to properly support the job mode, we need a {{MiniDispatcher}} which is started with a pre initialized {{JobGraph}} and launches a single {{JobManagerRunner}} with this job. Once the job is completed and if the {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should terminate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357075#comment-16357075 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166972719 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + final ValueStateDescriptor valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream , Integer> elementStream = env.fromCollection(keyedInput).rebalance() + .map(new MapFunction , Tuple2 >() { + private static final long serialVersionUID = 8710586935083422712L; + + @Override + public Tuple2 map(Tuple2 value) { + return value; + } + }).setParallelism(4).keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = -1110876099102344900L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; +
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357074#comment-16357074 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166970474 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. --- End diff -- Probably needs a better Javadoc... > Add Scala API for Connected Streams with Broadcast State. > - > > Key: FLINK-8456 > URL: https://issues.apache.org/jira/browse/FLINK-8456 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357078#comment-16357078 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166972441 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + final ValueStateDescriptor valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream , Integer> elementStream = env.fromCollection(keyedInput).rebalance() + .map(new MapFunction , Tuple2 >() { + private static final long serialVersionUID = 8710586935083422712L; + + @Override + public Tuple2 map(Tuple2 value) { + return value; + } + }).setParallelism(4).keyBy(new KeySelector , Integer>() { --- End diff -- The other calls should also go on new lines, I think that's the usual example style > Add Scala API for Connected Streams with Broadcast State. > - > > Key: FLINK-8456 > URL: https://issues.apache.org/jira/browse/FLINK-8456 >
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357079#comment-16357079 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166973933 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction +import org.apache.flink.streaming.api.functions.sink.DiscardingSink +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.util.Collector +import org.junit.Assert.assertEquals +import org.junit.Test + +/** + * ITCase for the [[org.apache.flink.api.common.state.BroadcastState]]. + */ +class BroadcastStateITCase extends AbstractTestBase { + + @Test + @throws[Exception] + def testConnectWithBroadcastTranslation(): Unit = { + +val timerTimestamp = 10L + +val DESCRIPTOR = new MapStateDescriptor[Long, String]( + "broadcast-state", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + BasicTypeInfo.STRING_TYPE_INFO) + +val expected = Map[Long, String]( + 0L -> "test:0", + 1L -> "test:1", + 2L -> "test:2", + 3L -> "test:3", + 4L -> "test:4", + 5L -> "test:5") + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +val srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Long]() { + +override def extractTimestamp(element: Long, previousElementTimestamp: Long): Long = + element + +override def checkAndGetNextWatermark(lastElement: Long, extractedTimestamp: Long) = + new Watermark(extractedTimestamp) + + }).keyBy((value: Long) => value) + +val srcTwo = env.fromCollection(expected.values.toSeq) + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[String]() { + +override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = + element.split(":")(1).toLong + +override def checkAndGetNextWatermark(lastElement: String, extractedTimestamp: Long) = + new Watermark(extractedTimestamp) + }) + +val broadcast = srcTwo.broadcast(DESCRIPTOR) +// the timestamp should be high enough to trigger the timer after all the elements arrive. +val output = srcOne.connect(broadcast).process( + new KeyedBroadcastProcessFunction[Long, Long, String, String]() { + +@throws[Exception] +override def processElement( +value: Long, +ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext, +out: Collector[String]): Unit = { + + ctx.timerService.registerEventTimeTimer(timerTimestamp) +} + +@throws[Exception] +override def processBroadcastElement( +value: String, +ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext, +out: Collector[String]): Unit = { + + val key = value.split(":")(1).toLong +
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357076#comment-16357076 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166972253 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + final ValueStateDescriptor valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream , Integer> elementStream = env.fromCollection(keyedInput).rebalance() --- End diff -- I'd put the `rebalance()` on a new line and maybe add a comment on why it's needed > Add Scala API for Connected Streams with Broadcast State. > - > > Key: FLINK-8456 > URL: https://issues.apache.org/jira/browse/FLINK-8456 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357077#comment-16357077 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r16697 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -441,6 +463,26 @@ class DataStream[T](stream: JavaStream[T]) { */ def broadcast: DataStream[T] = asScalaStream(stream.broadcast()) + /** +* Sets the partitioning of the [[DataStream]] so that the output elements +* are broadcasted to every parallel instance of the next operation. In addition, +* it implicitly creates a --- End diff -- I think the "creates a" is outdated here because you can specify multiple descriptors. This should also be fixed in the Java API. > Add Scala API for Connected Streams with Broadcast State. > - > > Key: FLINK-8456 > URL: https://issues.apache.org/jira/browse/FLINK-8456 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166973933 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction +import org.apache.flink.streaming.api.functions.sink.DiscardingSink +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.util.Collector +import org.junit.Assert.assertEquals +import org.junit.Test + +/** + * ITCase for the [[org.apache.flink.api.common.state.BroadcastState]]. + */ +class BroadcastStateITCase extends AbstractTestBase { + + @Test + @throws[Exception] + def testConnectWithBroadcastTranslation(): Unit = { + +val timerTimestamp = 10L + +val DESCRIPTOR = new MapStateDescriptor[Long, String]( + "broadcast-state", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + BasicTypeInfo.STRING_TYPE_INFO) + +val expected = Map[Long, String]( + 0L -> "test:0", + 1L -> "test:1", + 2L -> "test:2", + 3L -> "test:3", + 4L -> "test:4", + 5L -> "test:5") + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +val srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Long]() { + +override def extractTimestamp(element: Long, previousElementTimestamp: Long): Long = + element + +override def checkAndGetNextWatermark(lastElement: Long, extractedTimestamp: Long) = + new Watermark(extractedTimestamp) + + }).keyBy((value: Long) => value) + +val srcTwo = env.fromCollection(expected.values.toSeq) + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[String]() { + +override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = + element.split(":")(1).toLong + +override def checkAndGetNextWatermark(lastElement: String, extractedTimestamp: Long) = + new Watermark(extractedTimestamp) + }) + +val broadcast = srcTwo.broadcast(DESCRIPTOR) +// the timestamp should be high enough to trigger the timer after all the elements arrive. +val output = srcOne.connect(broadcast).process( + new KeyedBroadcastProcessFunction[Long, Long, String, String]() { + +@throws[Exception] +override def processElement( +value: Long, +ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext, +out: Collector[String]): Unit = { + + ctx.timerService.registerEventTimeTimer(timerTimestamp) +} + +@throws[Exception] +override def processBroadcastElement( +value: String, +ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext, +out: Collector[String]): Unit = { + + val key = value.split(":")(1).toLong + ctx.getBroadcastState(DESCRIPTOR).put(key, value) +} + +@throws[Exception] +override def onTimer( +timestamp: Long, +ctx: KeyedBroadcastProcessFunction[Long, Long,
[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166972253 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + final ValueStateDescriptor valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream , Integer> elementStream = env.fromCollection(keyedInput).rebalance() --- End diff -- I'd put the `rebalance()` on a new line and maybe add a comment on why it's needed ---
[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166972441 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + final ValueStateDescriptor valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream , Integer> elementStream = env.fromCollection(keyedInput).rebalance() + .map(new MapFunction , Tuple2 >() { + private static final long serialVersionUID = 8710586935083422712L; + + @Override + public Tuple2 map(Tuple2 value) { + return value; + } + }).setParallelism(4).keyBy(new KeySelector , Integer>() { --- End diff -- The other calls should also go on new lines, I think that's the usual example style ---
[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166970474 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. --- End diff -- Probably needs a better Javadoc... ð ---
[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r16697 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -441,6 +463,26 @@ class DataStream[T](stream: JavaStream[T]) { */ def broadcast: DataStream[T] = asScalaStream(stream.broadcast()) + /** +* Sets the partitioning of the [[DataStream]] so that the output elements +* are broadcasted to every parallel instance of the next operation. In addition, +* it implicitly creates a --- End diff -- I think the "creates a" is outdated here because you can specify multiple descriptors. This should also be fixed in the Java API. ---
[GitHub] flink pull request #5425: [FLINK-8456] Add Scala API for Connected Streams w...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5425#discussion_r166972719 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Testing example. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + final ValueStateDescriptor valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream , Integer> elementStream = env.fromCollection(keyedInput).rebalance() + .map(new MapFunction , Tuple2 >() { + private static final long serialVersionUID = 8710586935083422712L; + + @Override + public Tuple2 map(Tuple2 value) { + return value; + } + }).setParallelism(4).keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = -1110876099102344900L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }); + + BroadcastStream broadcastStream = env.fromCollection(input) + .flatMap(new FlatMapFunction () { +
[jira] [Created] (FLINK-8607) Add a basic embedded SQL CLI client
Timo Walther created FLINK-8607: --- Summary: Add a basic embedded SQL CLI client Key: FLINK-8607 URL: https://issues.apache.org/jira/browse/FLINK-8607 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Timo Walther Assignee: Timo Walther This issue describes the Implementation Plan 1 of FLIP-24. Goal: Add the basic features to play around with Flink's streaming SQL. {code} - Add CLI component that reads the configuration files - "Pre-registered table sources" - "Job parameters" - Add executor for retrieving pre-flight information and corresponding CLI SQL parser - SHOW TABLES - DESCRIBE TABLE - EXPLAIN - Add streaming append query submission to executor - Submit jars and run SELECT query using the ClusterClient - Collect results on heap and serve them on the CLI side (Internal Mode with SELECT) - SOURCE (for executing a SQL statement stored in a local file) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357012#comment-16357012 ] ASF GitHub Bot commented on FLINK-8362: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5426 Thanks! Since this is verified manually already, and we have a test that verifies everything is properly shaded, I'll merge this once Travis gives green. > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5426: [FLINK-8362] [elasticsearch] Shade all ES connector depen...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5426 Thanks! Since this is verified manually already, and we have a test that verifies everything is properly shaded, I'll merge this once Travis gives green. ---
[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away
[ https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357008#comment-16357008 ] ASF GitHub Bot commented on FLINK-8362: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5426 notice files look good to me. > Shade Elasticsearch dependencies away > - > > Key: FLINK-8362 > URL: https://issues.apache.org/jira/browse/FLINK-8362 > Project: Flink > Issue Type: Improvement > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > It would be nice to make the Elasticsearch connectors self-contained just > like the s3 file system implementations and the cassandra connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5426: [FLINK-8362] [elasticsearch] Shade all ES connector depen...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5426 notice files look good to me. ---
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357003#comment-16357003 ] Timo Walther commented on FLINK-8577: - Maybe we should also rename the methods to find them easier: {{tEnv.fromUpsertStream}}, {{tEnv.fromAppendStream}}, {{tEnv.fromDataSet()}}. > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)