[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface
[ https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941590#comment-15941590 ] ramkrishna.s.vasudevan commented on FLINK-5698: --- Once this is integrated will work on HBasetable source to work with NestedFieldsProjectableTableSource. Thanks [~tonycox]. > Add NestedFieldsProjectableTableSource interface > > > Key: FLINK-5698 > URL: https://issues.apache.org/jira/browse/FLINK-5698 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > > Add a NestedFieldsProjectableTableSource interface for some TableSource > implementation that support nesting projection push-down. > The interface could look as follows > {code} > def trait NestedFieldsProjectableTableSource { > def projectNestedFields(fields: Array[String]): > NestedFieldsProjectableTableSource[T] > } > {code} > This interface works together with ProjectableTableSource -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6186) Remove unused import
[ https://issues.apache.org/jira/browse/FLINK-6186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CanBin Zheng updated FLINK-6186: Description: Remove unused import org.apache.flink.api.java.ExecutionEnvironment in org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scala > Remove unused import > > > Key: FLINK-6186 > URL: https://issues.apache.org/jira/browse/FLINK-6186 > Project: Flink > Issue Type: Wish >Reporter: CanBin Zheng >Assignee: CanBin Zheng >Priority: Trivial > > Remove unused import org.apache.flink.api.java.ExecutionEnvironment in > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scala -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6186) Remove unused import
CanBin Zheng created FLINK-6186: --- Summary: Remove unused import Key: FLINK-6186 URL: https://issues.apache.org/jira/browse/FLINK-6186 Project: Flink Issue Type: Wish Reporter: CanBin Zheng Assignee: CanBin Zheng Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework
[ https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941547#comment-15941547 ] ASF GitHub Bot commented on FLINK-6117: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3600 @Rucongzhang Please review it whether it meet your requirement. > 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework > > > Key: FLINK-6117 > URL: https://issues.apache.org/jira/browse/FLINK-6117 > Project: Flink > Issue Type: Sub-task > Components: Client, JobManager >Affects Versions: 1.2.0 > Environment: Ubuntu, non-secured >Reporter: CanBin Zheng >Assignee: CanBin Zheng > Labels: security > Original Estimate: 336h > Remaining Estimate: 336h > > The value of 'zookeeper.sasl.disable' not used in the right way when starting > CuratorFramework. > Here are all the settings relevant to high-availability in my flink-conf.yaml: > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.storageDir: hdfs:///flink/ha/ > Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default > value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be > applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start, > both logs show that they attempt connecting to zookeeper in 'SASL' mode. > logs are like this: > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,534 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework
[ https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6117: - Issue Type: Sub-task (was: Bug) Parent: FLINK-5839 > 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework > > > Key: FLINK-6117 > URL: https://issues.apache.org/jira/browse/FLINK-6117 > Project: Flink > Issue Type: Sub-task > Components: Client, JobManager >Affects Versions: 1.2.0 > Environment: Ubuntu, non-secured >Reporter: CanBin Zheng >Assignee: CanBin Zheng > Labels: security > Original Estimate: 336h > Remaining Estimate: 336h > > The value of 'zookeeper.sasl.disable' not used in the right way when starting > CuratorFramework. > Here are all the settings relevant to high-availability in my flink-conf.yaml: > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.storageDir: hdfs:///flink/ha/ > Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default > value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be > applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start, > both logs show that they attempt connecting to zookeeper in 'SASL' mode. > logs are like this: > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,534 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable
[ https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941537#comment-15941537 ] CanBin Zheng commented on FLINK-6148: - This is a reduplicate issue, I have reported this problem before and fixing it. https://issues.apache.org/jira/browse/FLINK-6117 > The Zookeeper client occur SASL error when the sasl is disable > -- > > Key: FLINK-6148 > URL: https://issues.apache.org/jira/browse/FLINK-6148 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: zhangrucong1982 > > I use the flink in yarn cluster of version 1.2.0. The HA is configured in > flink-conf.yaml, but the sasl is disabled. The configurations are : > high-availability: zookeeper > high-availability.zookeeper.quorum: > 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181 > high-availability.zookeeper.storageDir: hdfs:/flink > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: flink0308 > zookeeper.sasl.disable: true > The client log、JobManager log、TaskManager log are contain the following error > information: > 2017-03-22 11:18:24,662 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-22 11:18:24,663 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941535#comment-15941535 ] ASF GitHub Bot commented on FLINK-5658: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108026230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Hi @sunjincheng121,thanks for your reminding, and i am glad to supplement it. > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework
[ https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941530#comment-15941530 ] ASF GitHub Bot commented on FLINK-6117: --- Github user zhengcanbin commented on the issue: https://github.com/apache/flink/pull/3600 @EronWright @StephanEwen In ZookeeperSaslClient.java, 'zookeeper.sasl.client' is true by default, so I agree to set 'zookeeper.sasl.disable' to false by default, for consistency. > 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework > > > Key: FLINK-6117 > URL: https://issues.apache.org/jira/browse/FLINK-6117 > Project: Flink > Issue Type: Bug > Components: Client, JobManager >Affects Versions: 1.2.0 > Environment: Ubuntu, non-secured >Reporter: CanBin Zheng >Assignee: CanBin Zheng > Labels: security > Original Estimate: 336h > Remaining Estimate: 336h > > The value of 'zookeeper.sasl.disable' not used in the right way when starting > CuratorFramework. > Here are all the settings relevant to high-availability in my flink-conf.yaml: > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.storageDir: hdfs:///flink/ha/ > Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default > value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be > applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start, > both logs show that they attempt connecting to zookeeper in 'SASL' mode. > logs are like this: > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,534 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...
Github user zhengcanbin commented on the issue: https://github.com/apache/flink/pull/3600 @EronWright @StephanEwen In ZookeeperSaslClient.java, 'zookeeper.sasl.client' is true by default, so I agree to set 'zookeeper.sasl.disable' to false by default, for consistency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable
[ https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6148: Assignee: (was: shijinkui) > The Zookeeper client occur SASL error when the sasl is disable > -- > > Key: FLINK-6148 > URL: https://issues.apache.org/jira/browse/FLINK-6148 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: zhangrucong1982 > > I use the flink in yarn cluster of version 1.2.0. The HA is configured in > flink-conf.yaml, but the sasl is disabled. The configurations are : > high-availability: zookeeper > high-availability.zookeeper.quorum: > 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181 > high-availability.zookeeper.storageDir: hdfs:/flink > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: flink0308 > zookeeper.sasl.disable: true > The client log、JobManager log、TaskManager log are contain the following error > information: > 2017-03-22 11:18:24,662 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-22 11:18:24,663 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable
[ https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6148: Assignee: shijinkui > The Zookeeper client occur SASL error when the sasl is disable > -- > > Key: FLINK-6148 > URL: https://issues.apache.org/jira/browse/FLINK-6148 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: zhangrucong1982 >Assignee: shijinkui > > I use the flink in yarn cluster of version 1.2.0. The HA is configured in > flink-conf.yaml, but the sasl is disabled. The configurations are : > high-availability: zookeeper > high-availability.zookeeper.quorum: > 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181 > high-availability.zookeeper.storageDir: hdfs:/flink > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: flink0308 > zookeeper.sasl.disable: true > The client log、JobManager log、TaskManager log are contain the following error > information: > 2017-03-22 11:18:24,662 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-22 11:18:24,663 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5217) Deprecated interface Checkpointed make clear suggestion
[ https://issues.apache.org/jira/browse/FLINK-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941522#comment-15941522 ] shijinkui commented on FLINK-5217: -- ping [~srichter] > Deprecated interface Checkpointed make clear suggestion > --- > > Key: FLINK-5217 > URL: https://issues.apache.org/jira/browse/FLINK-5217 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: shijinkui > Fix For: 1.2.1 > > > package org.apache.flink.streaming.api.checkpoint; > @Deprecated > @PublicEvolving > public interface Checkpointed extends > CheckpointedRestoring > this interface should have clear suggestion which version to give up this > interface, and which interface can instead of it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3599: [FLINK-6174][HA]introduce a new election service to make ...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3599 I don't think it's a good idea, as it can not solve the "split brain" issue too. The key problem is that `LeaderLatch` in curator is too sensitive to connection state to Zookeeper(it will revoke leadership when connection to zookeeper is temporarily broke), and probably the best way is offerring a "duller" LeaderLatch, which can be also used in standalone cluster. I did same work in our own private Spark release, let me see if it can be reused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941520#comment-15941520 ] shijinkui commented on FLINK-5860: -- ping [~yaroslav.mykhaylov] > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >FLINK_HDFS_PATH = "file:" + >
[jira] [Commented] (FLINK-6174) Introduce a leader election service in yarn mode to make JobManager always available
[ https://issues.apache.org/jira/browse/FLINK-6174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941521#comment-15941521 ] ASF GitHub Bot commented on FLINK-6174: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3599 I don't think it's a good idea, as it can not solve the "split brain" issue too. The key problem is that `LeaderLatch` in curator is too sensitive to connection state to Zookeeper(it will revoke leadership when connection to zookeeper is temporarily broke), and probably the best way is offerring a "duller" LeaderLatch, which can be also used in standalone cluster. I did same work in our own private Spark release, let me see if it can be reused. > Introduce a leader election service in yarn mode to make JobManager always > available > > > Key: FLINK-6174 > URL: https://issues.apache.org/jira/browse/FLINK-6174 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Tao Wang >Assignee: Tao Wang > > Now in yarn mode, if we use zookeeper as high availability choice, it will > create a election service to get a leader depending on zookeeper election. > When zookeeper leader crashes or the connection between JobManager and > zookeeper instance was broken, JobManager's leadership will be revoked and > send a Disconnect message to TaskManager, which will cancle all running tasks > and make them waiting connection rebuild between JM and ZK. > In yarn mode, we have one and only JobManager(AM) in same time, and it should > be alwasy leader instead of elected through zookeeper. We can introduce a new > leader election service in yarn mode to achive that. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6060) reference nonexistent class in the scaladoc
[ https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941519#comment-15941519 ] shijinkui commented on FLINK-6060: -- [~aljoscha] Sorry for my unclear description. For example, the class TaskOperationResult in the scaladoc. Actually TaskOperationResult is not exist, or it had been changed file name. So in such scaladoc, we should correct the referenced class. /** * Submits a task to the task manager. The result is to this message is a * [[TaskOperationResult]] message. * * @param tasks Descriptor which contains the information to start the task. */ case class SubmitTask(tasks: TaskDeploymentDescriptor) extends TaskMessage with RequiresLeaderSessionID > reference nonexistent class in the scaladoc > --- > > Key: FLINK-6060 > URL: https://issues.apache.org/jira/browse/FLINK-6060 > Project: Flink > Issue Type: Wish > Components: Scala API >Reporter: shijinkui > > TaskMessages.scala > ConnectedStreams.scala > DataStream.scala > Who can fix it? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6060) reference nonexistent class in the scaladoc
[ https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6060: - Summary: reference nonexistent class in the scaladoc (was: not exist class referance in the scala function annotation) > reference nonexistent class in the scaladoc > --- > > Key: FLINK-6060 > URL: https://issues.apache.org/jira/browse/FLINK-6060 > Project: Flink > Issue Type: Wish > Components: Scala API >Reporter: shijinkui > > TaskMessages.scala > ConnectedStreams.scala > DataStream.scala > Who can fix it? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941516#comment-15941516 ] shijinkui commented on FLINK-5754: -- [~greghogan] At first, I image that the flink tag is the same with other open source project, so the checkout a branch from tag. It's have to reset it, we had gone forward too much. If we have no special reason, can we don't delete any thing at tag release on the next milestone, that following the common tag/release rule? If so, it'll be very convenient to develop private flink version. And then it will have no any difficult to merge to flink community code base. Thanks > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941507#comment-15941507 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108025485 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- No problem. So,Can I change this JIRA.'s title, And open a new JIRA. to addresses the `RANGE` case ? @fhueske Thanks, SunJincheng > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108025485 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- No problem. So,Can I change this JIRA.'s title, And open a new JIRA. to addresses the `RANGE` case ? @fhueske Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941412#comment-15941412 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3607 HI, @rtudoran Thanks for this PR. It's looks very promising. Please rebase code on master first, Then I glad to take a look this changes. Best, SunJincheng > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3607: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3607 HI, @rtudoran Thanks for this PR. It's looks very promising. Please rebase code on master first, Then I glad to take a look this changes. Best, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6185) Input readers and output writers/formats need to support gzip
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6185: -- Description: File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such as {{FileOutputFormat}} and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. was: File sources (such as {{env#readCsvFile()}}) and sinks (such as {{FileOutputFormat}} and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. > Input readers and output writers/formats need to support gzip > - > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6185) Input readers and output writers/formats need to support gzip
Luke Hutchison created FLINK-6185: - Summary: Input readers and output writers/formats need to support gzip Key: FLINK-6185 URL: https://issues.apache.org/jira/browse/FLINK-6185 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor File sources (such as {{env#readCsvFile()}}) and sinks (such as FileOutputFormat and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6185) Input readers and output writers/formats need to support gzip
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6185: -- Description: File sources (such as {{env#readCsvFile()}}) and sinks (such as {{FileOutputFormat}} and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. was: File sources (such as {{env#readCsvFile()}}) and sinks (such as FileOutputFormat and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. > Input readers and output writers/formats need to support gzip > - > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{env#readCsvFile()}}) and sinks (such as > {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3269 Hi @tonycox, thanks for the update! I'll do some minor improvements and will merge the PR. Thank you, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface
[ https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941330#comment-15941330 ] ASF GitHub Bot commented on FLINK-5698: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3269 Hi @tonycox, thanks for the update! I'll do some minor improvements and will merge the PR. Thank you, Fabian > Add NestedFieldsProjectableTableSource interface > > > Key: FLINK-5698 > URL: https://issues.apache.org/jira/browse/FLINK-5698 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > > Add a NestedFieldsProjectableTableSource interface for some TableSource > implementation that support nesting projection push-down. > The interface could look as follows > {code} > def trait NestedFieldsProjectableTableSource { > def projectNestedFields(fields: Array[String]): > NestedFieldsProjectableTableSource[T] > } > {code} > This interface works together with ProjectableTableSource -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3605: [FLINK-6181][Start scripts] Fix regex in start scr...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3605#discussion_r108001505 --- Diff: tools/travis_mvn_watchdog.sh --- @@ -164,7 +164,7 @@ watchdog () { # Check the final fat jar for illegal artifacts check_shaded_artifacts() { - jar tf build-target/lib/flink-dist-*.jar > allClasses --- End diff -- The problem is that our dist jar is called `flink-dist_2.10-1.3-SNAPSHOT.jar`. so the old variant didn't match the jar. This leads to the following error in all travis build: ``` java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file or directory) at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.(ZipFile.java:220) at java.util.zip.ZipFile.(ZipFile.java:150) at java.util.zip.ZipFile.(ZipFile.java:121) at sun.tools.jar.Main.list(Main.java:1060) at sun.tools.jar.Main.run(Main.java:291) at sun.tools.jar.Main.main(Main.java:1233) ``` I'm doing `jar tf` here to check if guava and other libraries are not part of the fat jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6181) Zookeeper scripts use invalid regex
[ https://issues.apache.org/jira/browse/FLINK-6181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941153#comment-15941153 ] ASF GitHub Bot commented on FLINK-6181: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3605#discussion_r108001505 --- Diff: tools/travis_mvn_watchdog.sh --- @@ -164,7 +164,7 @@ watchdog () { # Check the final fat jar for illegal artifacts check_shaded_artifacts() { - jar tf build-target/lib/flink-dist-*.jar > allClasses --- End diff -- The problem is that our dist jar is called `flink-dist_2.10-1.3-SNAPSHOT.jar`. so the old variant didn't match the jar. This leads to the following error in all travis build: ``` java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file or directory) at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.(ZipFile.java:220) at java.util.zip.ZipFile.(ZipFile.java:150) at java.util.zip.ZipFile.(ZipFile.java:121) at sun.tools.jar.Main.list(Main.java:1060) at sun.tools.jar.Main.run(Main.java:291) at sun.tools.jar.Main.main(Main.java:1233) ``` I'm doing `jar tf` here to check if guava and other libraries are not part of the fat jar. > Zookeeper scripts use invalid regex > --- > > Key: FLINK-6181 > URL: https://issues.apache.org/jira/browse/FLINK-6181 > Project: Flink > Issue Type: Bug > Components: Build System, Startup Shell Scripts >Reporter: Robert Metzger >Assignee: Robert Metzger > > This issue has been reported by a user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/unable-to-add-more-servers-in-zookeeper-quorum-peers-in-flink-1-2-td12321.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6085) flink as micro service
[ https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939209#comment-15939209 ] Chen Qin edited comment on FLINK-6085 at 3/24/17 8:32 PM: -- I would like to see if we can agree on high level first. Service is primarily rpc interaction with horizontal scalability and latency requirements. Current way of bridge service with streaming pipeline via distributed Queue provides benefit of failure resilience and topic reuse at cost of extra hardware/software and latency, also no callback support. [~till.rohrmann] updates Briefly chatted offline with Maxim, it seems a bit hard to work around distributed queue consider pipeline can restart and offset rewind anytime, loss of insertion events is not acceptable(query might be fine but seems flink already address this issue with query able states) To echo Till's comments, yes, custom code could track those requests. Future question is if we can have a specific sink implementation which can reroute results to a specific rpc hosts (e.g http response or callback). was (Author: foxss): I would like to see if we can agree on high level first. Service is primarily rpc interaction with horizontal scalability and latency requirements. Current way of bridge service with streaming pipeline via distributed Queue provides benefit of failure resilience and topic reuse at cost of extra hardware/software and latency, also no callback support. [~till.rohrmann] > flink as micro service > -- > > Key: FLINK-6085 > URL: https://issues.apache.org/jira/browse/FLINK-6085 > Project: Flink > Issue Type: Improvement > Components: DataStream API, JobManager >Reporter: Chen Qin >Priority: Minor > Attachments: Untitled document.jpg > > > Track discussion around run flink as a micro service, includes but not > limited to > - RPC (web service endpoint) source > as web service endpoint accept RPC call, ingest to the streaming job(only > one) > - callback mechanism > - task assignment should honor deployment group (web tier hosts should be > isolated from rest of task assignment) > https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3564: [FLINK-6089] [table] Implement decoration phase for rewri...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3564 I accidentally merged this to `master` not `table-retraction`. Since the changes do not break anything, this is not a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941043#comment-15941043 ] ASF GitHub Bot commented on FLINK-5658: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107987832 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- I see, so this PR addresses the `ROW` case. I'll push out a hotfix. Thanks for the notification @sunjincheng121! > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107987832 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- I see, so this PR addresses the `ROW` case. I'll push out a hotfix. Thanks for the notification @sunjincheng121! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5990) Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5990. Resolution: Implemented Fix Version/s: 1.3.0 Implemented with 7a9d39fe9f659d43bf4719a2981f6c4771ffbe48 > Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5990 > URL: https://issues.apache.org/jira/browse/FLINK-5990 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER ROWS aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is required > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates event time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5803) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6145) Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-6145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941007#comment-15941007 ] sunjincheng commented on FLINK-6145: Fixed by [7a9d39f|https://github.com/apache/flink/commit/7a9d39fe9f659d43bf4719a2981f6c4771ffbe48] > Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to > SQL > - > > Key: FLINK-6145 > URL: https://issues.apache.org/jira/browse/FLINK-6145 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The goal of this issue is to add support for OVER ROWS aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) > AS sumB, > MIN(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) > AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates event time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5804) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941021#comment-15941021 ] Fabian Hueske commented on FLINK-5829: -- Calcite 1.12 has been release today: http://calcite.apache.org/news/2017/03/24/release-1.12.0/ We can upgrade and address the related issues. > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5570) Support register external catalog to table environment
[ https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5570. Resolution: Implemented Fix Version/s: 1.3.0 Implemented with 135a57c4bb37eaa9cb85faaff1cc694f9448fabd > Support register external catalog to table environment > -- > > Key: FLINK-5570 > URL: https://issues.apache.org/jira/browse/FLINK-5570 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: jingzhang > Fix For: 1.3.0 > > > This issue aims to support register one or more {{ExternalCatalog}} (which is > referred in https://issues.apache.org/jira/browse/FLINK-5568) to > {{TableEnvironment}}. After registration, SQL and TableAPI queries could > access to tables in the external catalogs without register those tables one > by one to {{TableEnvironment}} beforehand. > We plan to add two APIs in {{TableEnvironment}}: > 1. register externalCatalog > {code} > def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): > Unit > {code} > 2. scan a table from registered catalog and returns the resulting {{Table}}, > the API is very useful in TableAPI queries. > {code} > def scan(catalogName: String, tableIdentifier: TableIdentifier): Table > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6089) Implement decoration phase for rewriting predicated logical plan after volcano optimization phase
[ https://issues.apache.org/jira/browse/FLINK-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941013#comment-15941013 ] ASF GitHub Bot commented on FLINK-6089: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3564 I accidentally merged this to `master` not `table-retraction`. Since the changes do not break anything, this is not a problem. > Implement decoration phase for rewriting predicated logical plan after > volcano optimization phase > - > > Key: FLINK-6089 > URL: https://issues.apache.org/jira/browse/FLINK-6089 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > Fix For: 1.3.0 > > > At present, there is no chance to modify the DataStreamRel tree after the > volcano optimization. We consider to add a decoration phase after volcano > optimization phase. Decoration phase is dedicated for rewriting predicated > logical plan and is independent of cost module. After decoration phase is > added, we get the chance to apply retraction rules at this phase. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107982106 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- EVENT-TIME OVER Need to treatment "rows" and "range" clause separately, because - ROWS specifies the window in physical units (rows). - RANGE specifies the window as a logical offset. They have different semantics, for example: DATA: ``` (long, int, String) (1L, 1, "H") (2L, 2, "H") (2L, 3,"H") ``` ROWS sum(b) result: `1,3,6` RANGE sum(b) result: `1,6,6` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6089) Implement decoration phase for rewriting predicated logical plan after volcano optimization phase
[ https://issues.apache.org/jira/browse/FLINK-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6089. Resolution: Implemented Fix Version/s: 1.3.0 Implemented with 6949c8c79c41344023df08dde2936f06daa00e0d > Implement decoration phase for rewriting predicated logical plan after > volcano optimization phase > - > > Key: FLINK-6089 > URL: https://issues.apache.org/jira/browse/FLINK-6089 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > Fix For: 1.3.0 > > > At present, there is no chance to modify the DataStreamRel tree after the > volcano optimization. We consider to add a decoration phase after volcano > optimization phase. Decoration phase is dedicated for rewriting predicated > logical plan and is independent of cost module. After decoration phase is > added, we get the chance to apply retraction rules at this phase. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6145) Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-6145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-6145. -- Resolution: Fixed > Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to > SQL > - > > Key: FLINK-6145 > URL: https://issues.apache.org/jira/browse/FLINK-6145 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The goal of this issue is to add support for OVER ROWS aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) > AS sumB, > MIN(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) > AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates event time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5804) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5658. Resolution: Implemented Fix Version/s: 1.3.0 Implemented with fe2c61a28e6a5300b2cf4c1e50ee884b51ef42c9 > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941000#comment-15941000 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107982106 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- EVENT-TIME OVER Need to treatment "rows" and "range" clause separately, because - ROWS specifies the window in physical units (rows). - RANGE specifies the window as a logical offset. They have different semantics, for example: DATA: ``` (long, int, String) (1L, 1, "H") (2L, 2, "H") (2L, 3,"H") ``` ROWS sum(b) result: `1,3,6` RANGE sum(b) result: `1,6,6` > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5990) Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940991#comment-15940991 ] ASF GitHub Bot commented on FLINK-5990: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3585 > Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5990 > URL: https://issues.apache.org/jira/browse/FLINK-5990 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The goal of this issue is to add support for OVER ROWS aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is required > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates event time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5803) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940992#comment-15940992 ] ASF GitHub Bot commented on FLINK-5658: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3386 > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3409 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6089) Implement decoration phase for rewriting predicated logical plan after volcano optimization phase
[ https://issues.apache.org/jira/browse/FLINK-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940989#comment-15940989 ] ASF GitHub Bot commented on FLINK-6089: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3564 > Implement decoration phase for rewriting predicated logical plan after > volcano optimization phase > - > > Key: FLINK-6089 > URL: https://issues.apache.org/jira/browse/FLINK-6089 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > At present, there is no chance to modify the DataStreamRel tree after the > volcano optimization. We consider to add a decoration phase after volcano > optimization phase. Decoration phase is dedicated for rewriting predicated > logical plan and is independent of cost module. After decoration phase is > added, we get the chance to apply retraction rules at this phase. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema
[ https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940990#comment-15940990 ] ASF GitHub Bot commented on FLINK-5568: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3409 > Introduce interface for catalog, and provide an in-memory implementation, and > integrate with calcite schema > --- > > Key: FLINK-5568 > URL: https://issues.apache.org/jira/browse/FLINK-5568 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: jingzhang > > The {{TableEnvironment}} now provides a mechanism to register temporary > table. It registers the temp table to calcite catalog, so SQL and TableAPI > queries can access to those temp tables. Now DatasetTable, DataStreamTable > and TableSourceTable can be registered to {{TableEnvironment}} as temporary > tables. > This issue wants to provides a mechanism to connect external catalogs such as > HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could > access to tables in the external catalogs without register those tables to > {{TableEnvironment}} beforehand. > First, we should point out that there are two kinds of catalog in Flink > actually. > The first one is external catalog as we mentioned before, it provides CRUD > operations to databases/tables. > The second one is calcite catalog, it defines namespace that can be accessed > in Calcite queries. It depends on Calcite Schema/Table abstraction. > SqlValidator and SqlConverter depends on the calcite catalog to fetch the > tables in SQL or TableAPI. > So we need to do the following things: > 1. introduce interface for external catalog, maybe provide an in-memory > implementation first for test and develop environment. > 2. introduce a mechanism to connect external catalog with Calcite catalog so > the tables/databases in external catalog can be accessed in Calcite catalog. > Including convert databases of externalCatalog to Calcite sub-schemas, > convert tables in a database of externalCatalog to Calcite tables (only > support {{TableSourceTable}}). > 3. register external catalog to {{TableEnvironment}}. > Here is the design mode of ExternalCatalogTable. > | identifier | TableIdentifier | dbName and tableName > of table | > | tableType | String | type of external catalog table, > e.g csv, hbase, kafka > | schema| DataSchema| schema of table data, > including column names and column types > | partitionColumnNames | List | names of partition column > | properties | Map|properties of > external catalog table > | stats | TableStats | statistics of external > catalog table > | comment | String | > | create time | long > There is still a detail problem need to be take into consideration, that is , > how to convert {{ExternalCatalogTable}} to {{TableSourceTable}}. The > question is equals to convert {{ExternalCatalogTable}} to {{TableSource}} > because we could easily get {{TableSourceTable}} from {{TableSource}}. > Because different {{TableSource}} often contains different fields to initiate > an instance. E.g. {{CsvTableSource}} needs path, fieldName, fieldTypes, > fieldDelim, rowDelim and so on to create a new instance , > {{KafkaTableSource}} needs configuration and tableName to create a new > instance. So it's not a good idea to let Flink framework be responsible for > translate {{ExternalCatalogTable}} to different kind of > {{TableSourceTable}}. > Here is one solution. Let {{TableSource}} specify a converter. > 1. provide an Annatition named {{ExternalCatalogCompatible}}. The > {{TableSource}} with the annotation means it is compatible with external > catalog, that is, it could be converted to or from ExternalCatalogTable. This > annotation specifies the tabletype and converter of the tableSource. For > example, for {{CsvTableSource}}, it specifies the tableType is csv and > converter class is CsvTableSourceConverter. > {code} > @ExternalCatalogCompatible(tableType = "csv", converter = > classOf[CsvTableSourceConverter]) > class CsvTableSource(...) { > ...} > {code} > 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save > the tableType and converter in a Map > 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the > converter based on tableType. and let converter do convert -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3585: [FLINK-5990][table]Add event time OVER ROWS BETWEE...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3585 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3564: [FLINK-6089] [table] Implement decoration phase fo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3564 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3386 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3611: [backport] [FLINK-6183]/[FLINK-6184] Prevent some ...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3611 [backport] [FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric groups Backport of #3610 for 1.2 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6183_6184_metric_task_backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3611 commit 790b3ce444e10191731850bad71c35fe050d9af3 Author: zentolDate: 2017-03-24T18:11:58Z [FLINK-6184] Prevent NPE in buffer metrics commit 13e40466ffe63783c59cc979900ba7af2d693576 Author: zentol Date: 2017-03-24T18:39:31Z [FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
[ https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940943#comment-15940943 ] ASF GitHub Bot commented on FLINK-6183: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3611 [backport] [FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric groups Backport of #3610 for 1.2 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6183_6184_metric_task_backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3611 commit 790b3ce444e10191731850bad71c35fe050d9af3 Author: zentolDate: 2017-03-24T18:11:58Z [FLINK-6184] Prevent NPE in buffer metrics commit 13e40466ffe63783c59cc979900ba7af2d693576 Author: zentol Date: 2017-03-24T18:39:31Z [FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed > TaskMetricGroup may not be cleanup when Task.run() is never called or exits > early > - > > Key: FLINK-6183 > URL: https://issues.apache.org/jira/browse/FLINK-6183 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > > The TaskMetricGroup is created when a Task is created. It is cleaned up at > the end of Task.run() in the finally block. If however run() is never called > due some failure between the creation and the call to run the metric group is > never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
[ https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940910#comment-15940910 ] ASF GitHub Bot commented on FLINK-6183: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3610 [FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric groups This PR fixes 2 issues: 1) It prevents some NPEs in the buffer metrics by instantiating them after the task has been registered in the NetworkEnvironment. 2) It prevents some cases where the TaskMetricGroup would never be closed. These cases include an early exit in `Task#run()` and when 2) tasks with an identical ExecutionAttemptID are run on the same TM. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6183_6184_metric_task Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3610.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3610 > TaskMetricGroup may not be cleanup when Task.run() is never called or exits > early > - > > Key: FLINK-6183 > URL: https://issues.apache.org/jira/browse/FLINK-6183 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > > The TaskMetricGroup is created when a Task is created. It is cleaned up at > the end of Task.run() in the finally block. If however run() is never called > due some failure between the creation and the call to run the metric group is > never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3610: [FLINK-6183]/[FLINK-6184] Prevent some NPE and unc...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3610 [FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric groups This PR fixes 2 issues: 1) It prevents some NPEs in the buffer metrics by instantiating them after the task has been registered in the NetworkEnvironment. 2) It prevents some cases where the TaskMetricGroup would never be closed. These cases include an early exit in `Task#run()` and when 2) tasks with an identical ExecutionAttemptID are run on the same TM. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6183_6184_metric_task Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3610.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3610 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
[ https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-6183: --- Assignee: Chesnay Schepler > TaskMetricGroup may not be cleanup when Task.run() is never called or exits > early > - > > Key: FLINK-6183 > URL: https://issues.apache.org/jira/browse/FLINK-6183 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > > The TaskMetricGroup is created when a Task is created. It is cleaned up at > the end of Task.run() in the finally block. If however run() is never called > due some failure between the creation and the call to run the metric group is > never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968436 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator() { + return Collections.emptyListIterator(); + } + }; + } + + return new
[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime
[ https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940880#comment-15940880 ] radu commented on FLINK-6073: - the join window can be one window element as we emit for every incoming event from the main stream (left). For the incoming events from the right stream (inner stream) we can cache the last data into a ValueState > Support for SQL inner queries for proctime > -- > > Key: FLINK-6073 > URL: https://issues.apache.org/jira/browse/FLINK-6073 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: radu >Priority: Critical > Labels: features > Attachments: innerquery.png > > > Time target: Proc Time > **SQL targeted query examples:** > > Q1) `Select item, (select item2 from stream2 ) as itemExtern from stream1;` > Comments: This is the main functionality targeted by this JIRA to enable to > combine in the main query results from an inner query. > Q2) `Select s1.item, (Select a2 from table as t2 where table.id = s1.id > limit 1) from s1;` > Comments: > Another equivalent way to write the first example of inner query is with > limit 1. This ensures the equivalency with the SingleElementAggregation used > when translated the main target syntax for inner query. We must ensure that > the 2 syntaxes are supported and implemented with the same functionality. > There is the option also to select elements in the inner query from a table > not just from a different stream. This should be a sub-JIRA issue implement > this support. > **Description:** > Parsing the SQL inner query via calcite is translated to a join function > (left join with always true condition) between the output of the query on the > main stream and the output of a single output aggregation operation on the > inner query. The translation logic is shown below > ``` > LogicalJoin [condition=true;type=LEFT] > LogicalSingleValue[type=aggregation] > …logic of inner query (LogicalProject, LogicalScan…) > …logical of main,external query (LogicalProject, LogicalScan…)) > ``` > `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special > case operation rather than a proper join to be implemented between > stream-to-stream. The implementation behavior should attach to the main > stream output a value from a different query. > `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder > of the single value that results from the inner query. As this operator is > the guarantee that the inner query will bring to the join no more than one > value, there are several options on how to consider it’s functionality in the > streaming context: > 1.Throw an error if the inner query returns more than one result. This > would be a typical behavior in the case of standard SQL over DB. However, it > is very unlikely that a stream would only emit a single value. Therefore, > such a behavior would be very limited for streams in the inner query. > However, such a behavior might be more useful and common if the inner query > is over a table. > 1.We can interpret the usage of this parameter as the guarantee that at > one moment only one value is selected. Therefore the behavior would rather be > as a filter to select one value. This brings the option that the output of > this operator evolves in time with the second stream that drives the inner > query. The decision on when to evolve the stream should depend on what marks > the evolution of the stream (processing time, watermarks/event time, > ingestion time, window time partitions…). > In this JIRA issue the evolution would be marked by the processing time. For > this implementation the operator would work based on option 2. Hence at every > moment the state of the operator that holds one value can evolve with the > last elements. In this way the logic of the inner query is to select always > the last element (fields, or other query related transformations based on the > last value). This behavior is needed in many scenarios: (e.g., the typical > problem of computing the total income, when incomes are in multiple > currencies and the total needs to be computed in one currency by using always > the last exchange rate). > This behavior is motivated also by the functionality of the 3rd SQL query > example – Q3 (using inner query as the input source for FROM ). In such > scenarios, the selection in the main query would need to be done based on > latest elements. Therefore with such a behavior the 2 types of queries (Q1 > and Q3) would provide the same, intuitive result. > **Functionality example** > Based on the logical translation plan, we exemplify next the behavior of the > inner query applied on 2
[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime
[ https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940872#comment-15940872 ] ASF GitHub Bot commented on FLINK-6073: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3609 @fhueske @twalthr @sunjincheng121 @shijinkui @stefanobortoli @hongyuhong I have made a first implementation draft for supporting inner queries mainly when operating on processing time. I would highly appreciate some feedback from you to further enhance the approach. The idea of the implementation is described in https://issues.apache.org/jira/browse/FLINK-6073?filter=-2 > Support for SQL inner queries for proctime > -- > > Key: FLINK-6073 > URL: https://issues.apache.org/jira/browse/FLINK-6073 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: radu >Priority: Critical > Labels: features > Attachments: innerquery.png > > > Time target: Proc Time > **SQL targeted query examples:** > > Q1) `Select item, (select item2 from stream2 ) as itemExtern from stream1;` > Comments: This is the main functionality targeted by this JIRA to enable to > combine in the main query results from an inner query. > Q2) `Select s1.item, (Select a2 from table as t2 where table.id = s1.id > limit 1) from s1;` > Comments: > Another equivalent way to write the first example of inner query is with > limit 1. This ensures the equivalency with the SingleElementAggregation used > when translated the main target syntax for inner query. We must ensure that > the 2 syntaxes are supported and implemented with the same functionality. > There is the option also to select elements in the inner query from a table > not just from a different stream. This should be a sub-JIRA issue implement > this support. > **Description:** > Parsing the SQL inner query via calcite is translated to a join function > (left join with always true condition) between the output of the query on the > main stream and the output of a single output aggregation operation on the > inner query. The translation logic is shown below > ``` > LogicalJoin [condition=true;type=LEFT] > LogicalSingleValue[type=aggregation] > …logic of inner query (LogicalProject, LogicalScan…) > …logical of main,external query (LogicalProject, LogicalScan…)) > ``` > `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special > case operation rather than a proper join to be implemented between > stream-to-stream. The implementation behavior should attach to the main > stream output a value from a different query. > `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder > of the single value that results from the inner query. As this operator is > the guarantee that the inner query will bring to the join no more than one > value, there are several options on how to consider it’s functionality in the > streaming context: > 1.Throw an error if the inner query returns more than one result. This > would be a typical behavior in the case of standard SQL over DB. However, it > is very unlikely that a stream would only emit a single value. Therefore, > such a behavior would be very limited for streams in the inner query. > However, such a behavior might be more useful and common if the inner query > is over a table. > 1.We can interpret the usage of this parameter as the guarantee that at > one moment only one value is selected. Therefore the behavior would rather be > as a filter to select one value. This brings the option that the output of > this operator evolves in time with the second stream that drives the inner > query. The decision on when to evolve the stream should depend on what marks > the evolution of the stream (processing time, watermarks/event time, > ingestion time, window time partitions…). > In this JIRA issue the evolution would be marked by the processing time. For > this implementation the operator would work based on option 2. Hence at every > moment the state of the operator that holds one value can evolve with the > last elements. In this way the logic of the inner query is to select always > the last element (fields, or other query related transformations based on the > last value). This behavior is needed in many scenarios: (e.g., the typical > problem of computing the total income, when incomes are in multiple > currencies and the total needs to be computed in one currency by using always > the last exchange rate). > This behavior is motivated also by the functionality of the 3rd SQL query > example – Q3 (using inner query as the input source for FROM ). In such > scenarios, the selection in the main query
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940873#comment-15940873 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967886 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator()
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940878#comment-15940878 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107969496 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); + } + @Override protected void run() throws Exception { - + final String iterationId = getConfiguration().getIterationId(); if (iterationId == null || iterationId.length() == 0) { throw new Exception("Missing iteration ID in the task configuration"); } - - final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , - getEnvironment().getTaskInfo().getIndexOfThisSubtask()); - + final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); final long iterationWaitTime = getConfiguration().getIterationWaitTime(); final boolean shouldWait = iterationWaitTime > 0; - final BlockingQueue dataChannel = new ArrayBlockingQueue (1); + final BlockingQueue > dataChannel + = new ArrayBlockingQueue<>(1); // offer the queue for the tail BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel); LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID); // do the work
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940875#comment-15940875 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968567 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator()
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940877#comment-15940877 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967910 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator()
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940874#comment-15940874 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968436 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator()
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940876#comment-15940876 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968935 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); --- End diff -- if this is the same UpstreamLogger instance that you pass 2 lines above then why not use that? :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968935 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); --- End diff -- if this is the same UpstreamLogger instance that you pass 2 lines above then why not use that? :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107969496 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); + } + @Override protected void run() throws Exception { - + final String iterationId = getConfiguration().getIterationId(); if (iterationId == null || iterationId.length() == 0) { throw new Exception("Missing iteration ID in the task configuration"); } - - final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , - getEnvironment().getTaskInfo().getIndexOfThisSubtask()); - + final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); final long iterationWaitTime = getConfiguration().getIterationWaitTime(); final boolean shouldWait = iterationWaitTime > 0; - final BlockingQueue dataChannel = new ArrayBlockingQueue (1); + final BlockingQueue > dataChannel + = new ArrayBlockingQueue<>(1); // offer the queue for the tail BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel); LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID); // do the work try { - @SuppressWarnings("unchecked") - RecordWriterOutput[] outputs = (RecordWriterOutput[]) getStreamOutputs(); + outputs = (RecordWriterOutput[])
[GitHub] flink issue #3609: [FLINK-6073] - Support for SQL inner queries for proctime
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3609 @fhueske @twalthr @sunjincheng121 @shijinkui @stefanobortoli @hongyuhong I have made a first implementation draft for supporting inner queries mainly when operating on processing time. I would highly appreciate some feedback from you to further enhance the approach. The idea of the implementation is described in https://issues.apache.org/jira/browse/FLINK-6073?filter=-2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967910 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator() { + return Collections.emptyListIterator(); + } + }; + } + + return new
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968567 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator() { + return Collections.emptyListIterator(); + } + }; + } + + return new
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967886 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator{ + + private final StreamConfig config; + + private LinkedList > slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config. getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List > wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config. getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable () { + @Override + public Iterator iterator() { + return Collections.emptyListIterator(); + } + }; + } + + return new
[GitHub] flink pull request #3609: Inner query implementation model
GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/3609 Inner query implementation model Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6073 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3609 commit e2c9bafa1695a9f602fbfed272916abfacfd3cbe Author: rtudoranDate: 2017-03-24T18:22:55Z Inner query implementation model --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6184) Buffer metrics can cause NPE
[ https://issues.apache.org/jira/browse/FLINK-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-6184: --- Assignee: Chesnay Schepler > Buffer metrics can cause NPE > > > Key: FLINK-6184 > URL: https://issues.apache.org/jira/browse/FLINK-6184 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.0 > > > The Buffer metrics defined in the TaskIOMetricGroup are created when a Task > is created. At this time, the bufferPool in the Input gates that the metrics > make use of is still null, leading to possible NPEs. > These metrics should either be created after the required objects are fully > initialized or guard this case with null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3590: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3590 @fhueske @sunjincheng121 @hongyuhong @stefanobortoli I have run a test to compare the 3 approaches: -windows based #3550 -processfunction based with events managed in ValueState[Queue] - this PR -processfunction based with events managed in MapState[Long,JList] #3607 The simple benchmark that I run generates events 1 ms apart (a 5 tuple like the one we used in the tests). There are 2 scenarios that I run a simple counting over the window contents Scenario 1) 2 second window (~2000 events in a window) - 100K events in total generated Window based solution: 113839 ms Process based (with Queue): 111792 ms Process based on MapState: 110533 ms 10 second window (~1 events in a window) - 200K events in total generated Window based solution: 218399ms Process based (with Queue): 217343ms Process based on MapState: 217657ms I would say that the approaches are similar in performance (with some small advantage for ProcessingFunctions). Regarding the 2 approaches for handing data in process windows, I would say that the price to pay for serializing/deserializing the whole list of events is matched by (serializing/deserializing the timestamp keys + independently deserializing the events that need to be removed). Considering that the performance are similar personally I believe that the approach with Queue is preferred because we can actually gain something (i.e., the order of the events) which will be helpful in extending the implementation for full SQL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940830#comment-15940830 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3590 @fhueske @sunjincheng121 @hongyuhong @stefanobortoli I have run a test to compare the 3 approaches: -windows based #3550 -processfunction based with events managed in ValueState[Queue] - this PR -processfunction based with events managed in MapState[Long,JList] #3607 The simple benchmark that I run generates events 1 ms apart (a 5 tuple like the one we used in the tests). There are 2 scenarios that I run a simple counting over the window contents Scenario 1) 2 second window (~2000 events in a window) - 100K events in total generated Window based solution: 113839 ms Process based (with Queue): 111792 ms Process based on MapState: 110533 ms 10 second window (~1 events in a window) - 200K events in total generated Window based solution: 218399ms Process based (with Queue): 217343ms Process based on MapState: 217657ms I would say that the approaches are similar in performance (with some small advantage for ProcessingFunctions). Regarding the 2 approaches for handing data in process windows, I would say that the price to pay for serializing/deserializing the whole list of events is matched by (serializing/deserializing the timestamp keys + independently deserializing the events that need to be removed). Considering that the performance are similar personally I believe that the approach with Queue is preferred because we can actually gain something (i.e., the order of the events) which will be helpful in extending the implementation for full SQL > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940819#comment-15940819 ] ASF GitHub Bot commented on FLINK-5715: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 Merged in c6a80725053c49dd2064405577291bdc86c82003. > Asynchronous snapshotting for HeapKeyedStateBackend > --- > > Key: FLINK-5715 > URL: https://issues.apache.org/jira/browse/FLINK-5715 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > Blocking snapshots render the HeapKeyedStateBackend practically unusable for > many user in productions. Their jobs can not tolerate stopped processing for > the time it takes to write gigabytes of data from memory to disk. > Asynchronous snapshots would be a solution to this problem. The challenge for > the implementation is coming up with a copy-on-write scheme for the in-memory > hash maps that build the foundation of this backend. After taking a closer > look, this problem is twofold. First, providing CoW semantics for the hashmap > itself, as a mutible structure, thereby avoiding costly locking or blocking > where possible. Second, CoW for the mutable value objects, e.g. through > cloning via serializers. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940820#comment-15940820 ] ASF GitHub Bot commented on FLINK-5715: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3602 > Asynchronous snapshotting for HeapKeyedStateBackend > --- > > Key: FLINK-5715 > URL: https://issues.apache.org/jira/browse/FLINK-5715 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > Blocking snapshots render the HeapKeyedStateBackend practically unusable for > many user in productions. Their jobs can not tolerate stopped processing for > the time it takes to write gigabytes of data from memory to disk. > Asynchronous snapshots would be a solution to this problem. The challenge for > the implementation is coming up with a copy-on-write scheme for the in-memory > hash maps that build the foundation of this backend. After taking a closer > look, this problem is twofold. First, providing CoW semantics for the hashmap > itself, as a mutible structure, thereby avoiding costly locking or blocking > where possible. Second, CoW for the mutable value objects, e.g. through > cloning via serializers. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3602: [FLINK-5715] Asynchronous snapshots for heap keyed...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3602 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3602: [FLINK-5715] Asynchronous snapshots for heap keyed state ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 Merged in c6a80725053c49dd2064405577291bdc86c82003. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940809#comment-15940809 ] ASF GitHub Bot commented on FLINK-5715: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 After a discussion with @StephanEwen , we decided to follow my proposal. Merging this now. > Asynchronous snapshotting for HeapKeyedStateBackend > --- > > Key: FLINK-5715 > URL: https://issues.apache.org/jira/browse/FLINK-5715 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > Blocking snapshots render the HeapKeyedStateBackend practically unusable for > many user in productions. Their jobs can not tolerate stopped processing for > the time it takes to write gigabytes of data from memory to disk. > Asynchronous snapshots would be a solution to this problem. The challenge for > the implementation is coming up with a copy-on-write scheme for the in-memory > hash maps that build the foundation of this backend. After taking a closer > look, this problem is twofold. First, providing CoW semantics for the hashmap > itself, as a mutible structure, thereby avoiding costly locking or blocking > where possible. Second, CoW for the mutable value objects, e.g. through > cloning via serializers. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3602: [FLINK-5715] Asynchronous snapshots for heap keyed state ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 After a discussion with @StephanEwen , we decided to follow my proposal. Merging this now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940804#comment-15940804 ] Fabian Hueske commented on FLINK-5498: -- Hi [~lincoln.86xy], I thought about this problem and I think I found a memory-safe way to address it, i.e., without a {{CoGroupFunction}}. The idea is to filter out invalid {{null}} join results in a {{GroupReduceFunction}}. The overhead for this another sort, but the operator becomes memory-safe. I think we should prefer a less-efficient memory-safe implementation if possible. I made a prototype implementation for a LEFT OUTER JOIN (see below) but haven't thought about whether it would work for FULL OUTER JOINs as well. What do you think? Best, Fabian {code} public class OuterJoin { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); Row[] dataOuter = new Row[]{Row.of(1, 100), Row.of(1, 100), Row.of(2, 200), Row.of(3, 300), Row.of(4, 400), Row.of(5, 500), Row.of(6, 600), Row.of(6, 600)}; Row[] dataInner = new Row[]{Row.of(1, 10), Row.of(1, 110), Row.of(2, 220), Row.of(3, 30), Row.of(4, 40), Row.of(4, 41)}; RowTypeInfo rowType = new RowTypeInfo( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); DataSet outer = env.fromCollection(Arrays.asList(dataOuter), rowType); DataSet inner = env.fromCollection(Arrays.asList(dataInner), rowType); DataSet joined = outer .leftOuterJoin(inner) .where(0).equalTo(0) // define join keys .with(new JoinFunc()) // join function adds flag whether join with null or not .groupBy(1, 2) // group by all fields of the outer table (partitioning is reused) .reduceGroup(new NullFilter()); // filter out all null joins if there was any matched join joined.print(); } @FunctionAnnotation.ForwardedFieldsFirst({"f0->f1; f1->f2"}) @FunctionAnnotation.ForwardedFieldsSecond({"f0->f3"}) public static class JoinFunc implements JoinFunction, ResultTypeQueryable { @Override public Row join(Row outer, Row inner) throws Exception { if (inner == null) { return Row.of(true, outer.getField(0), outer.getField(1), null, null); } else { if (((int)outer.getField(1)) > ((int)inner.getField(1))) { // remains return Row.of(false, outer.getField(0), outer.getField(1), inner.getField(0), inner.getField(1)); } else { // filtered out return Row.of(true, outer.getField(0), outer.getField(1), null, null); } } } @Override public TypeInformation getProducedType() { return new RowTypeInfo( BasicTypeInfo.BOOLEAN_TYPE_INFO, // flag to indicate null BasicTypeInfo.INT_TYPE_INFO, // first field of outer table BasicTypeInfo.INT_TYPE_INFO, // second field of outer table BasicTypeInfo.INT_TYPE_INFO, // first field of inner table BasicTypeInfo.INT_TYPE_INFO // second field of inner table ); } } @FunctionAnnotation.ForwardedFields({"f1->f0; f2->f1"}) public static class NullFilter implements GroupReduceFunction
, ResultTypeQueryable { @Override public void reduce(Iterable rows, Collector out) throws Exception { boolean needsNull = true; int nullCnt = 0; Row r = null; Iterator rowsIt = rows.iterator(); while (rowsIt.hasNext()) { r = rowsIt.next(); boolean isNull = (Boolean) r.getField(0); if (!isNull) { // non nulls are directly forwarded out.collect(Row.of(r.getField(1), r.getField(2), r.getField(3), r.getField(4))); needsNull = false; } else { // nulls are not forwarded but counted. Let's see if there were some join matches nullCnt++; } } if (needsNull) { // no join matches found. Forward null joins for (int i = 0; i < nullCnt; i++) { out.collect(Row.of(r.getField(1), r.getField(2), null, null)); } } } @Override public TypeInformation getProducedType() { return new RowTypeInfo( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); } } } {code} > Add support for left/right outer joins with non-equality predicates (and 1+ > equality predicates) > > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee:
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940777#comment-15940777 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey. Any update/opinion/something anyone? Just a gentle reminder, sorry if this sounds a bit desperate :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey. Any update/opinion/something anyone? Just a gentle reminder, sorry if this sounds a bit desperate :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
[ https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6183: Affects Version/s: 1.2.0 > TaskMetricGroup may not be cleanup when Task.run() is never called or exits > early > - > > Key: FLINK-6183 > URL: https://issues.apache.org/jira/browse/FLINK-6183 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Priority: Blocker > > The TaskMetricGroup is created when a Task is created. It is cleaned up at > the end of Task.run() in the finally block. If however run() is never called > due some failure between the creation and the call to run the metric group is > never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6184) Buffer metrics can cause NPE
[ https://issues.apache.org/jira/browse/FLINK-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6184: Affects Version/s: 1.2.0 > Buffer metrics can cause NPE > > > Key: FLINK-6184 > URL: https://issues.apache.org/jira/browse/FLINK-6184 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.0 > > > The Buffer metrics defined in the TaskIOMetricGroup are created when a Task > is created. At this time, the bufferPool in the Input gates that the metrics > make use of is still null, leading to possible NPEs. > These metrics should either be created after the required objects are fully > initialized or guard this case with null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
[ https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6183: Component/s: Metrics > TaskMetricGroup may not be cleanup when Task.run() is never called or exits > early > - > > Key: FLINK-6183 > URL: https://issues.apache.org/jira/browse/FLINK-6183 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Priority: Blocker > > The TaskMetricGroup is created when a Task is created. It is cleaned up at > the end of Task.run() in the finally block. If however run() is never called > due some failure between the creation and the call to run the metric group is > never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6184) Buffer metrics can cause NPE
Chesnay Schepler created FLINK-6184: --- Summary: Buffer metrics can cause NPE Key: FLINK-6184 URL: https://issues.apache.org/jira/browse/FLINK-6184 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.3.0 Reporter: Chesnay Schepler Priority: Blocker Fix For: 1.3.0 The Buffer metrics defined in the TaskIOMetricGroup are created when a Task is created. At this time, the bufferPool in the Input gates that the metrics make use of is still null, leading to possible NPEs. These metrics should either be created after the required objects are fully initialized or guard this case with null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
Chesnay Schepler created FLINK-6183: --- Summary: TaskMetricGroup may not be cleanup when Task.run() is never called or exits early Key: FLINK-6183 URL: https://issues.apache.org/jira/browse/FLINK-6183 Project: Flink Issue Type: Bug Affects Versions: 1.3.0 Reporter: Chesnay Schepler Priority: Blocker The TaskMetricGroup is created when a Task is created. It is cleaned up at the end of Task.run() in the finally block. If however run() is never called due some failure between the creation and the call to run the metric group is never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-4760) Kafka 09 Consumer failed to initialize state because of corrupted operator state and not able to recover
[ https://issues.apache.org/jira/browse/FLINK-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4760. - Resolution: Not A Problem > Kafka 09 Consumer failed to initialize state because of corrupted operator > state and not able to recover > > > Key: FLINK-4760 > URL: https://issues.apache.org/jira/browse/FLINK-4760 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing >Reporter: Zhenzhong Xu >Priority: Critical > > java.io.StreamCorruptedException: invalid stream header: 0278 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) > at java.io.ObjectInputStream.(ObjectInputStream.java:299) > at > org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:79) > at > org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Razvan closed FLINK-6063. - Resolution: Not A Problem It's not an actual issue with the framework just isn't clear dfs MUST be used for HA. > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg > > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at
[GitHub] flink issue #3502: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on the issue: https://github.com/apache/flink/pull/3502 @twalthr I have tried run `SubQueryRemoveRule` (register it in `FlinkRulesSets`) and so on but it have not called from Flink. I have looked thru Calcite docs and they said that calcite rules called accordingly signature of their constructors. So we should register it in `FlinkRulesSets.scala`. But scala has unpleasant thing - erasure. And I suspect it is root cause of it. P.S. Sometime I'm going crazy about travis build timeouts :-( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940716#comment-15940716 ] ASF GitHub Bot commented on FLINK-4565: --- Github user DmytroShkvyra commented on the issue: https://github.com/apache/flink/pull/3502 @twalthr I have tried run `SubQueryRemoveRule` (register it in `FlinkRulesSets`) and so on but it have not called from Flink. I have looked thru Calcite docs and they said that calcite rules called accordingly signature of their constructors. So we should register it in `FlinkRulesSets.scala`. But scala has unpleasant thing - erasure. And I suspect it is root cause of it. P.S. Sometime I'm going crazy about travis build timeouts :-( > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940707#comment-15940707 ] Razvan commented on FLINK-6063: --- Hi, it works with DFS for me though I'd underline it is required for HA more in the documentation. Thank you for the help! > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg > > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at
[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940678#comment-15940678 ] ASF GitHub Bot commented on FLINK-6107: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3567 I have no preference for any style of import order. I just wanted to mandate some order so that we don't have edit wars when people use different IDE settings. @greghogan Have you tried setting up import check settings that more closely match the current flink styl? If we find something that works I'm happy to change that. > Add custom checkstyle for flink-streaming-java > -- > > Key: FLINK-6107 > URL: https://issues.apache.org/jira/browse/FLINK-6107 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > There was some consensus on the ML > (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E) > that we want to have a more uniform code style. We should start > module-by-module and by introducing increasingly stricter rules. We have to > be aware of the PR situation and ensure that we have minimal breakage for > contributors. > This issue aims at adding a custom checkstyle.xml for > {{flink-streaming-java}} that is based on our current checkstyle.xml but adds > these checks for Javadocs: > {code} > > > > > > > > > > > > > > > > > > > > > > > > > {code} > This checks: > - Every type has a type-level Javadoc > - Proper use of {{}} in Javadocs > - First sentence must end with a proper punctuation mark > - Proper use (including closing) of HTML tags -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3567: [FLINK-6107] Add custom checkstyle for flink-streaming-ja...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3567 I have no preference for any style of import order. I just wanted to mandate some order so that we don't have edit wars when people use different IDE settings. @greghogan Have you tried setting up import check settings that more closely match the current flink styl? If we find something that works I'm happy to change that. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3567: [FLINK-6107] Add custom checkstyle for flink-strea...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3567#discussion_r107946611 --- Diff: tools/maven/strict-checkstyle.xml --- @@ -0,0 +1,550 @@ + + +http://www.puppycrawl.com/dtds/configuration_1_3.dtd;> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + --- End diff -- This is the Javadoc for the method that the checks are referring to: ``` /** * Returns {@code true} if and only if the system property * named by the argument exists and is equal to the string * {@code "true"}. (Beginning with version 1.0.2 of the * JavaTM platform, the test of * this string is case insensitive.) A system property is accessible * through {@code getProperty}, a method defined by the * {@code System} class. * * If there is no property with the specified name, or if the specified * name is empty or null, then {@code false} is returned. * * @param name the system property name. * @return the {@code boolean} value of the system property. * @throws SecurityException for the same reasons as * {@link System#getProperty(String) System.getProperty} * @see java.lang.System#getProperty(java.lang.String) * @see java.lang.System#getProperty(java.lang.String, java.lang.String) */ public static boolean getBoolean(String name) { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940675#comment-15940675 ] ASF GitHub Bot commented on FLINK-6107: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3567#discussion_r107946611 --- Diff: tools/maven/strict-checkstyle.xml --- @@ -0,0 +1,550 @@ + + +http://www.puppycrawl.com/dtds/configuration_1_3.dtd;> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + --- End diff -- This is the Javadoc for the method that the checks are referring to: ``` /** * Returns {@code true} if and only if the system property * named by the argument exists and is equal to the string * {@code "true"}. (Beginning with version 1.0.2 of the * JavaTM platform, the test of * this string is case insensitive.) A system property is accessible * through {@code getProperty}, a method defined by the * {@code System} class. * * If there is no property with the specified name, or if the specified * name is empty or null, then {@code false} is returned. * * @param name the system property name. * @return the {@code boolean} value of the system property. * @throws SecurityException for the same reasons as * {@link System#getProperty(String) System.getProperty} * @see java.lang.System#getProperty(java.lang.String) * @see java.lang.System#getProperty(java.lang.String, java.lang.String) */ public static boolean getBoolean(String name) { ``` > Add custom checkstyle for flink-streaming-java > -- > > Key: FLINK-6107 > URL: https://issues.apache.org/jira/browse/FLINK-6107 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > There was some consensus on the ML > (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E) > that we want to have a more uniform code style. We should start > module-by-module and by introducing increasingly stricter rules. We have to > be aware of the PR situation and ensure that we have minimal breakage for > contributors. > This issue aims at adding a custom checkstyle.xml for > {{flink-streaming-java}} that is based on our current checkstyle.xml but adds > these checks for Javadocs: > {code} > > > > > > > > > > > > > > > > > > > > > > > > > {code} > This checks: > - Every type has a type-level Javadoc > - Proper use of {{}} in Javadocs > - First sentence must end with a proper punctuation mark > - Proper use (including closing) of HTML tags -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6169) yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error
[ https://issues.apache.org/jira/browse/FLINK-6169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940669#comment-15940669 ] ASF GitHub Bot commented on FLINK-6169: --- GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/3608 FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error Stop yarnClient before throwing exception You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3608.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3608 commit f0528de9cc03f603b77d6adcd222ff085967b614 Author: tedyuDate: 2017-03-24T16:30:31Z FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error > yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error > -- > > Key: FLINK-6169 > URL: https://issues.apache.org/jira/browse/FLINK-6169 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Priority: Minor > > Here is one example: > {code} > if(jobManagerMemoryMb > maxRes.getMemory() ) { > failSessionDuringDeployment(yarnClient, yarnApplication); > throw new YarnDeploymentException("The cluster does not have the > requested resources for the JobManager available!\n" > + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + > jobManagerMemoryMb + "MB. " + NOTE); > } > {code} > yarnClient should be stopped when deployment fails. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3608: FLINK-6169 yarnClient should be stopped in Abstrac...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/3608 FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error Stop yarnClient before throwing exception You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3608.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3608 commit f0528de9cc03f603b77d6adcd222ff085967b614 Author: tedyuDate: 2017-03-24T16:30:31Z FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---