[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194056#comment-16194056 ] ASF GitHub Bot commented on FLINK-6225: --- Github user PangZhi commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r143098834 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, injectTableName(INSERT_DATA_QUERY), new Properties()); + CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + + newCassandrTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraTableSinkE2E() throws Exception { --- End diff -- this is added as another reviewer think better to add e2e test using sql api. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user PangZhi commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r143098834 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, injectTableName(INSERT_DATA_QUERY), new Properties()); + CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + + newCassandrTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraTableSinkE2E() throws Exception { --- End diff -- this is added as another reviewer think better to add e2e test using sql api. ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193795#comment-16193795 ] ASF GitHub Bot commented on FLINK-7076: --- GitHub user suez1224 reopened a pull request: https://github.com/apache/flink/pull/4729 [FLINK-7076] [ResourceManager] implement YARN stopWorker logic ## What is the purpose of the change *Implement stopWorker logic for YarnResourceManager* ## Brief change log - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId mappings* - *Implement the stopWorker logic for YARN* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink implement-stopWorker-yarn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4729 > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
GitHub user suez1224 reopened a pull request: https://github.com/apache/flink/pull/4729 [FLINK-7076] [ResourceManager] implement YARN stopWorker logic ## What is the purpose of the change *Implement stopWorker logic for YarnResourceManager* ## Brief change log - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId mappings* - *Implement the stopWorker logic for YARN* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink implement-stopWorker-yarn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4729 ---
[jira] [Commented] (FLINK-2961) Add support for basic type Date in Table API
[ https://issues.apache.org/jira/browse/FLINK-2961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193603#comment-16193603 ] ASF GitHub Bot commented on FLINK-2961: --- Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1322 [![Coverage Status](https://coveralls.io/builds/13591448/badge)](https://coveralls.io/builds/13591448) Changes Unknown when pulling **2544011784017fc12234fe38ebf6b3c58b84 on twalthr:TableApiDate** into ** on apache:master**. > Add support for basic type Date in Table API > > > Key: FLINK-2961 > URL: https://issues.apache.org/jira/browse/FLINK-2961 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Minor > Fix For: 1.0.0 > > > Currently, the basic type {{Date}} is not implemented in the Table API. In > order to have a mapping of the most important ANSI SQL types for FLINK-2099. > It makes sense to add support for {{Date}} to represent date, time and > timestamps of milliseconds precision. > Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A > `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted > to `DATE` must have the format "`-MM-dd HH:mm:ss.SSS`", "`-MM-dd`", > "`HH:mm:ss`", or a milliseconds timestamp. All timestamps refer to the UTC > timezone beginning from January 1, 1970, 00:00:00 in milliseconds. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #1322: [FLINK-2961] [table] Add support for basic type Date in T...
Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1322 [![Coverage Status](https://coveralls.io/builds/13591448/badge)](https://coveralls.io/builds/13591448) Changes Unknown when pulling **2544011784017fc12234fe38ebf6b3c58b84 on twalthr:TableApiDate** into ** on apache:master**. ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193570#comment-16193570 ] ASF GitHub Bot commented on FLINK-7076: --- Github user suez1224 closed the pull request at: https://github.com/apache/flink/pull/4729 > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user suez1224 closed the pull request at: https://github.com/apache/flink/pull/4729 ---
[jira] [Commented] (FLINK-2973) Add flink-benchmark with compliant licenses again
[ https://issues.apache.org/jira/browse/FLINK-2973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193521#comment-16193521 ] Greg Hogan commented on FLINK-2973: --- [~fhueske] [~rmetzger] can we not include this as an optional (or unlisted) module in the same manner as {{flink-connector-kinesis}}? Both are restricted by dependence on a Category X license (GPL and ASL, respectively). > Add flink-benchmark with compliant licenses again > - > > Key: FLINK-2973 > URL: https://issues.apache.org/jira/browse/FLINK-2973 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.0.0 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Minor > Fix For: 1.0.0 > > > We recently created the Maven module {{flink-benchmark}} for micro-benchmarks > and ported most of the existing micro-benchmarks to the Java benchmarking > framework JMH. However, JMH is part of OpenJDK and under GPL license which is > not compatible with the AL2. > Consequently, we need to remove this dependency and either revert the porting > commits or port the benchmarks to another benchmarking framework. An > alternative could be [Google's Caliper|https://github.com/google/caliper] > library which is under AL2. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4760: [hotfix][docs] Polish docs index page for consistency in ...
Github user ChrisChinchilla commented on the issue: https://github.com/apache/flink/pull/4760 @alpinegizmo No worries, done! ---
[jira] [Created] (FLINK-7768) Load File Systems via Java Service abstraction
Stephan Ewen created FLINK-7768: --- Summary: Load File Systems via Java Service abstraction Key: FLINK-7768 URL: https://issues.apache.org/jira/browse/FLINK-7768 Project: Flink Issue Type: Improvement Components: Core Reporter: Stephan Ewen Fix For: 1.4.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7767) Avoid loading Hadoop conf dynamically at runtime
Stephan Ewen created FLINK-7767: --- Summary: Avoid loading Hadoop conf dynamically at runtime Key: FLINK-7767 URL: https://issues.apache.org/jira/browse/FLINK-7767 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.0 The bucketing sink dynamically loads the Hadoop configuration in various places. The result of that configuration is not always predictable, as it tries to automagically discover the Hadoop config files. A better approach is to rely on the Flink configuration to find the Hadoop configuration, or to directly use the Hadoop configuration used by the Hadoop file systems. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7766) Remove obsolete reflection for hflush on HDFS
Stephan Ewen created FLINK-7766: --- Summary: Remove obsolete reflection for hflush on HDFS Key: FLINK-7766 URL: https://issues.apache.org/jira/browse/FLINK-7766 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.0 This code originally existed for compatibility with Hadoop 1. Since Hadoop 1 support is dropped, this is no longer necessary. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7763) TableSinkITCase fails with "object reuse" enabled
[ https://issues.apache.org/jira/browse/FLINK-7763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193234#comment-16193234 ] Stephan Ewen commented on FLINK-7763: - It would be great to change that. We are very much looking to avoid additional copies when passing record between chained functions, because this costs unnecessary performance. But all operators need to be written to not mutate incoming objects to make that possible. > TableSinkITCase fails with "object reuse" enabled > - > > Key: FLINK-7763 > URL: https://issues.apache.org/jira/browse/FLINK-7763 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Set {{objectReuse}} to {{true}} in {{ExecutionConfig}} to reproduce the > failing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193229#comment-16193229 ] Stephan Ewen commented on FLINK-5372: - Could it be related to FLINK-7757 ? > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193225#comment-16193225 ] Stephan Ewen commented on FLINK-7608: - One other aspect to consider is the cost of histograms. The latency sample rate cannot be too fast when using complex histograms, otherwise it interferes with the execution. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193174#comment-16193174 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989725 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); + } + catch (Exception | LinkageError e) { + return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); + } + + // check (for eager and better exception messages) if the Hadoop classes are available here + try { + Class.forName(HADOOP_CONFIG_CLASS, false, cl); + Class.forName(HADOOP_FS_CLASS, false, cl); + } + catch (ClassNotFoundException e) { --- End diff -- ditto > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193176#comment-16193176 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989384 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { --- End diff -- Shall we log a warning for this exception? I'd prefer to have a log entry at the very top of log file to be easily discovered > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193177#comment-16193177 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989807 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); + } + catch (Exception | LinkageError e) { + return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); + } + + // check (for eager and better exception messages) if the Hadoop classes are available here + try { + Class.forName(HADOOP_CONFIG_CLASS, false, cl); + Class.forName(HADOOP_FS_CLASS, false, cl); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies."); + } + + // Create the factory. + try { + return factoryClass.newInstance(); + } + catch (Exception | LinkageError e) { --- End diff -- ditto > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193175#comment-16193175 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989415 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); + } + catch (Exception | LinkageError e) { --- End diff -- ditto > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989415 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); + } + catch (Exception | LinkageError e) { --- End diff -- ditto ---
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989725 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); + } + catch (Exception | LinkageError e) { + return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); + } + + // check (for eager and better exception messages) if the Hadoop classes are available here + try { + Class.forName(HADOOP_CONFIG_CLASS, false, cl); + Class.forName(HADOOP_FS_CLASS, false, cl); + } + catch (ClassNotFoundException e) { --- End diff -- ditto ---
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989807 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); + } + catch (Exception | LinkageError e) { + return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); + } + + // check (for eager and better exception messages) if the Hadoop classes are available here + try { + Class.forName(HADOOP_CONFIG_CLASS, false, cl); + Class.forName(HADOOP_FS_CLASS, false, cl); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies."); + } + + // Create the factory. + try { + return factoryClass.newInstance(); + } + catch (Exception | LinkageError e) { --- End diff -- ditto ---
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142989384 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { --- End diff -- Shall we log a warning for this exception? I'd prefer to have a log entry at the very top of log file to be easily discovered ---
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193138#comment-16193138 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4776 ok, sounds good. That also explains some questions I commented, so I removed them. > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4776: [FLINK-7643] [core] Rework FileSystem loading to use fact...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4776 ok, sounds good. That also explains some questions I commented, so I removed them. ---
[jira] [Commented] (FLINK-7292) Fix EMPTY MATCH bug in CEP.
[ https://issues.apache.org/jira/browse/FLINK-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193122#comment-16193122 ] Kostas Kloudas commented on FLINK-7292: --- Hi [~litrain_1], I believe that for infinite streams, an empty result does not make much sense. Conceptually, the empty result means that "there is no match in your input". If the input in infinite, this becomes "there is no match in your input *so far* " and given that this "so far" is not defined, it would mean that we have infinite "no matches" between potential matches. Even if we integrated it, this would imply a waste of resources, as we would have to iterate over all keys, and for each one emit an "empty match" element. Given the above, for the sake of having a clean JIRA and given that this discussion seems stale, I would recommend closing this issue. If nobody objects till Monday, I will close it. Please let me know if you disagree. > Fix EMPTY MATCH bug in CEP. > --- > > Key: FLINK-7292 > URL: https://issues.apache.org/jira/browse/FLINK-7292 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: zhangxiaoyu > > Currently, with the pattern {quote}a? {quote}and the event{quote} a1{quote}, > the result pattern is only{quote} a1{quote}without the empty match. > We wish the empty matched is also returned. And I am working on this issue > now. > My method is checking if there exists empty match only when the the first > event comes(at the StartState) ——try to traverse the PROCEED edges with the > trueFunction condition from the StartState, see if it can arrive FinalState, > if so, add an empty list to the result. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026 ] Paolo Rendano edited comment on FLINK-7606 at 10/5/17 3:39 PM: --- IMO the timeout should be triggered after the expiration of the time window (in the example 5 minutes) without any new elements (i.e. if the last received element is out of that time window). With this kind of strategy you would get at most a latency of the time window interval. Would it solve? was (Author: i...@paolorendano.it): IMO the timeout should be triggered after the expiration of the time window (in the example 10 minutes) without any new elements (i.e. if the last received element is out of that time window). With this kind of strategy you would get at most a latency of the time window interval. Would it solve? > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026 ] Paolo Rendano edited comment on FLINK-7606 at 10/5/17 3:38 PM: --- IMO the timeout should be triggered after the expiration of the time window (in the example 10 minutes) without any new elements (i.e. if the last received element is out of that time window). With this kind of strategy you would get at most a latency of the time window interval. Would it solve? was (Author: i...@paolorendano.it): IMO the timeout should be triggered after the expiration of the time window (in the example 10 seconds) without any new elements (i.e. if the last received element is out of that time window). With this kind of strategy you would get at most a latency of the time window interval. Would it solve? > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026 ] Paolo Rendano commented on FLINK-7606: -- IMO the timeout should be triggered after the expiration of the time window (in the example 10 seconds) without any new elements (i.e. if the last received element is out of that time window). With this kind of strategy you would get at most a latency of the time window interval. Would it solve? > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
[ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193005#comment-16193005 ] Kostas Kloudas commented on FLINK-7549: --- Thanks for closing the issue [~i...@paolorendano.it] ! > CEP - Pattern not discovered if source streaming is very fast > - > > Key: FLINK-7549 > URL: https://issues.apache.org/jira/browse/FLINK-7549 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 >Reporter: Paolo Rendano > > Hi all, > I'm doing some stress test on my pattern using JMeter to populate source data > on a rabbitmq queue. This queue contains status generated by different > devices . In my test case I set to loop on a base of 1000 cycles, each one > sending respectively the first and the second status that generate the event > using flink CEP (status keyed by device). I expect to get an output of 1000 > events. > In my early tests I launched that but I noticed that I get only partial > results in output (70/80% of the expected ones). Introducing a delay in > jmeter plan between the sending of the two status solved the problem. The > minimum delay (of course this is on my local machine, on other machines may > vary) that make things work is 20/25 ms. > My code is structured this way (the following is a semplification): > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setAutoWatermarkInterval(100L); > // source definition > DataStream dataStreamSource = > env.addSource(new > MYRMQAutoboundQueueSource<>(connectionConfig, > conf.getSourceExchange(), > conf.getSourceRoutingKey(), > conf.getSourceQueueName(), > true, > new MyMessageWrapperSchema())) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { > private static final long serialVersionUID = > -1L; > @Override > public long extractTimestamp(MyMessageWrapper > element) { > if > (element.getData().get("stateTimestamp")==null) { > throw new RuntimeException("Status > Timestamp is null during time ordering for device [" + > element.getData().get("deviceCode") + "]"); > } > return > FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); > } > }) > .name("MyIncomingStatus"); > // PATTERN DEFINITION > PatternmyPattern = Pattern > .begin("start") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st", "none")) > .next("end") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st","started")) > .within(Time.minutes(3)); > // CEP DEFINITION > PatternStream myPatternStream = > CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); > DataStream > outputStream = > myPatternStream.flatSelect(patternFlatTimeoutFunction, > patternFlatSelectFunction); > // SINK DEFINITION > outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, > outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); > {code} > digging and logging messages received by flink in "extractTimestamp", what > happens is that with that so high rate of messages, source may receive > messages with the same timestamp but with different deviceCode. > Any idea? > Thanks, regards > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4777: Convergence
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4777 Convergence ## What is the purpose of the change This pull request enables dependency convergence for couple of modules. Enabling it for all of them at once is unfortunately too complicated. ## Brief change log Check commit messages ## Verifying this change This change **SHOULD** be already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink convergence Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4777.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4777 commit c7cc24d062aa233d86b68b7438c9a4e717003393 Author: Piotr NowojskiDate: 2017-09-29T16:23:29Z [FLINK-7739][kafka-tests] Set shorter heartbeats intervals Default pause value of 60seconds is too large (tests would timeout before akka react) commit 1677791f10153b9f7ecd552eac148d6ae3d056f1 Author: Piotr Nowojski Date: 2017-10-04T11:48:11Z [FLINK-7739][kafka-tests] Set restart delay to non zero Give TaskManagers some time to clean up before restaring a job. commit 937c3fb388d9d7104b6336f59c3674bb70bfbf50 Author: Piotr Nowojski Date: 2017-10-04T14:50:57Z [FLINK-7739] Exclude netty dependency from zookeeper Zookeeper was pulling in conflicting Netty version. Conflict was extremly subtle - TaskManager in kafka tests was deadlocking in some rare corner cases. commit 47a9d3801de899b74adfed5338df8935b2509b05 Author: Piotr Nowojski Date: 2017-10-05T13:17:13Z [hotfix][build] Add maven-enforcer version property commit 4a9c6fa8957d4725ace40ac92a9a9d8d5cd6523f Author: Piotr Nowojski Date: 2017-10-04T15:45:58Z [FLINK-7765][annotations] Enable dependency convergence in flink-annotations commit 07cceb6fdefe27ec9d474e563d653fbac27988b1 Author: Piotr Nowojski Date: 2017-10-05T13:26:48Z [FLINK-7765][hadoop2] Enable dependency convergence in flink-shaded-hadoop2 commit 0f988b773b64cd57140e3d56b12802161d1f635a Author: Piotr Nowojski Date: 2017-10-05T14:36:02Z [FLINK-7765][curator] Enable dependency convergence in flink-shaded-curator ---
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192994#comment-16192994 ] Kostas Kloudas commented on FLINK-7606: --- You are right! There is already a discussion about introducing something like an "idle" watermark, that simply advances time for inactive partitions. This will help in some similar usecases. Now for your case specifically, it could work only if you know that no more elements belonging to a specific period (in event time) will come. Because if such elements arrive, then your previous results will be incorrect, right? This is the reason why we have not introduced such a mechanism. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192961#comment-16192961 ] Chesnay Schepler commented on FLINK-7765: - This is a duplicate of FLINK-4034. > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192953#comment-16192953 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4742 @tillrohrmann I've addressed the second round of comments. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/job/canc...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4742 @tillrohrmann I've addressed the second round of comments. ---
[jira] [Commented] (FLINK-7707) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192928#comment-16192928 ] ASF GitHub Bot commented on FLINK-7707: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4772#discussion_r142949904 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.checkpoint.SubtaskStateStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * REST handler which serves checkpoint statistics for subtasks. + */ +public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHandler{ + + public TaskCheckpointStatisticDetailsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + } + + @Override + protected TaskCheckpointStatisticsWithSubtaskDetails handleCheckpointRequest( + HandlerRequest request, + AbstractCheckpointStats checkpointStats) throws RestHandlerException { + + final JobVertexID jobVertexId = request.getPathParameter(JobVertexIdPathParameter.class); + + final TaskStateStats taskStatistics = checkpointStats.getTaskStateStats(jobVertexId); + + if (taskStatistics != null) { + + final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary( + taskStatistics.getSummaryStats(), + checkpointStats.getTriggerTimestamp()); + +
[GitHub] flink pull request #4772: [FLINK-7707] [flip6] Add TaskCheckpointStatisticDe...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4772#discussion_r142949904 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.checkpoint.SubtaskStateStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * REST handler which serves checkpoint statistics for subtasks. + */ +public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHandler{ + + public TaskCheckpointStatisticDetailsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + } + + @Override + protected TaskCheckpointStatisticsWithSubtaskDetails handleCheckpointRequest( + HandlerRequest request, + AbstractCheckpointStats checkpointStats) throws RestHandlerException { + + final JobVertexID jobVertexId = request.getPathParameter(JobVertexIdPathParameter.class); + + final TaskStateStats taskStatistics = checkpointStats.getTaskStateStats(jobVertexId); + + if (taskStatistics != null) { + + final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary( + taskStatistics.getSummaryStats(), + checkpointStats.getTriggerTimestamp()); + + final List subtaskCheckpointStatistics = createSubtaskCheckpointStatistics( + taskStatistics.getSubtaskStats(), + checkpointStats.getTriggerTimestamp()); + +
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192920#comment-16192920 ] ASF GitHub Bot commented on FLINK-7068: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4358 > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4358 ---
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142943570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- we could also add a dedicated resolve method for each parameter that a particular class defines, but at that point we're duplicating the individual parameter methods. ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192903#comment-16192903 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142943570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- we could also add a dedicated resolve method for each parameter that a particular class defines, but at that point we're duplicating the individual parameter methods. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
[ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paolo Rendano resolved FLINK-7549. -- Resolution: Fixed > CEP - Pattern not discovered if source streaming is very fast > - > > Key: FLINK-7549 > URL: https://issues.apache.org/jira/browse/FLINK-7549 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 >Reporter: Paolo Rendano > > Hi all, > I'm doing some stress test on my pattern using JMeter to populate source data > on a rabbitmq queue. This queue contains status generated by different > devices . In my test case I set to loop on a base of 1000 cycles, each one > sending respectively the first and the second status that generate the event > using flink CEP (status keyed by device). I expect to get an output of 1000 > events. > In my early tests I launched that but I noticed that I get only partial > results in output (70/80% of the expected ones). Introducing a delay in > jmeter plan between the sending of the two status solved the problem. The > minimum delay (of course this is on my local machine, on other machines may > vary) that make things work is 20/25 ms. > My code is structured this way (the following is a semplification): > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setAutoWatermarkInterval(100L); > // source definition > DataStream dataStreamSource = > env.addSource(new > MYRMQAutoboundQueueSource<>(connectionConfig, > conf.getSourceExchange(), > conf.getSourceRoutingKey(), > conf.getSourceQueueName(), > true, > new MyMessageWrapperSchema())) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { > private static final long serialVersionUID = > -1L; > @Override > public long extractTimestamp(MyMessageWrapper > element) { > if > (element.getData().get("stateTimestamp")==null) { > throw new RuntimeException("Status > Timestamp is null during time ordering for device [" + > element.getData().get("deviceCode") + "]"); > } > return > FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); > } > }) > .name("MyIncomingStatus"); > // PATTERN DEFINITION > PatternmyPattern = Pattern > .begin("start") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st", "none")) > .next("end") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st","started")) > .within(Time.minutes(3)); > // CEP DEFINITION > PatternStream myPatternStream = > CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); > DataStream > outputStream = > myPatternStream.flatSelect(patternFlatTimeoutFunction, > patternFlatSelectFunction); > // SINK DEFINITION > outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, > outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); > {code} > digging and logging messages received by flink in "extractTimestamp", what > happens is that with that so high rate of messages, source may receive > messages with the same timestamp but with different deviceCode. > Any idea? > Thanks, regards > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192891#comment-16192891 ] Paolo Rendano commented on FLINK-7606: -- HI [~kkl0u], 1) exactly 2) exactly, but my stream is a never ending stream, so I cannot close. The idea could be, why do not flush automatically the buffer not only on a memory use base but also after a configurable timeout? Could be a good enhancement to implement for different use cases. Paolo > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
[ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192886#comment-16192886 ] Paolo Rendano commented on FLINK-7549: -- Hi [~kkl0u] yes, I solved setting: {code:java} env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); {code} Thanks > CEP - Pattern not discovered if source streaming is very fast > - > > Key: FLINK-7549 > URL: https://issues.apache.org/jira/browse/FLINK-7549 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 >Reporter: Paolo Rendano > > Hi all, > I'm doing some stress test on my pattern using JMeter to populate source data > on a rabbitmq queue. This queue contains status generated by different > devices . In my test case I set to loop on a base of 1000 cycles, each one > sending respectively the first and the second status that generate the event > using flink CEP (status keyed by device). I expect to get an output of 1000 > events. > In my early tests I launched that but I noticed that I get only partial > results in output (70/80% of the expected ones). Introducing a delay in > jmeter plan between the sending of the two status solved the problem. The > minimum delay (of course this is on my local machine, on other machines may > vary) that make things work is 20/25 ms. > My code is structured this way (the following is a semplification): > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setAutoWatermarkInterval(100L); > // source definition > DataStream dataStreamSource = > env.addSource(new > MYRMQAutoboundQueueSource<>(connectionConfig, > conf.getSourceExchange(), > conf.getSourceRoutingKey(), > conf.getSourceQueueName(), > true, > new MyMessageWrapperSchema())) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { > private static final long serialVersionUID = > -1L; > @Override > public long extractTimestamp(MyMessageWrapper > element) { > if > (element.getData().get("stateTimestamp")==null) { > throw new RuntimeException("Status > Timestamp is null during time ordering for device [" + > element.getData().get("deviceCode") + "]"); > } > return > FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); > } > }) > .name("MyIncomingStatus"); > // PATTERN DEFINITION > PatternmyPattern = Pattern > .begin("start") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st", "none")) > .next("end") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st","started")) > .within(Time.minutes(3)); > // CEP DEFINITION > PatternStream myPatternStream = > CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); > DataStream > outputStream = > myPatternStream.flatSelect(patternFlatTimeoutFunction, > patternFlatSelectFunction); > // SINK DEFINITION > outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, > outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); > {code} > digging and logging messages received by flink in "extractTimestamp", what > happens is that with that so high rate of messages, source may receive > messages with the same timestamp but with different deviceCode. > Any idea? > Thanks, regards > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4381: [FLINK-7196][blob] add a TTL to all transient BLOBs
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4381 rebased onto the latest #4359 ---
[jira] [Commented] (FLINK-7262) remove unused FallbackLibraryCacheManager
[ https://issues.apache.org/jira/browse/FLINK-7262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192882#comment-16192882 ] ASF GitHub Bot commented on FLINK-7262: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4403 rebased onto the latest #4381 > remove unused FallbackLibraryCacheManager > - > > Key: FLINK-7262 > URL: https://issues.apache.org/jira/browse/FLINK-7262 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{FallbackLibraryCacheManager}} is basically only used in unit tests nowadays > and should probably be removed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4403: [FLINK-7262][blob] remove the unused FallbackLibraryCache...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4403 rebased onto the latest #4381 ---
[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192866#comment-16192866 ] ASF GitHub Bot commented on FLINK-7410: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933578 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -183,17 +183,25 @@ trait CommonCorrelate { } private[flink] def correlateOpName( + inputType: RelDataType, rexCall: RexCall, sqlFunction: TableSqlFunction, - rowType: RelDataType) + rowType: RelDataType, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) : String = { -s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" +s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," + + s" select: ${selectToString(rowType)}" } - private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { -val udtfName = sqlFunction.getName -val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") + private[flink] def correlateToString( + inputType: RelDataType, + rexCall: RexCall, + sqlFunction: TableSqlFunction, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { +val inFields = inputType.getFieldNames.asScala.toList +val udtfName = sqlFunction.toString +val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(",") --- End diff -- please add a space: `mkString(",")` -> `mkString(", ")` > Use toString method to display operator names for UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Use {{toString}} method to display operator names for UserDefinedFunction. > The method will return class name by default. Users can also override the > method to return whatever he wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7196) add a TTL to transient BLOB files
[ https://issues.apache.org/jira/browse/FLINK-7196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192860#comment-16192860 ] ASF GitHub Bot commented on FLINK-7196: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4381 rebased onto the latest #4359 > add a TTL to transient BLOB files > - > > Key: FLINK-7196 > URL: https://issues.apache.org/jira/browse/FLINK-7196 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Transient BLOB files are not automatically cleaned up unless the > {{BlobCache}}/{{BlobServer}} are shut down or the files are deleted via the > {{delete}} methods. Additionally, they should have a default time-to-live > (TTL) so that they may be cleaned up in failure cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933262 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala --- @@ -41,7 +41,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c)"), -term("function", func1.getClass.getCanonicalName), +term("correlate", s"table(func1($$cor0.c))"), +term("select", "a,b,c,f0"), --- End diff -- please change to `term("select", "a", "b", "c", "f0"),` (i.e., use separate strings for the field names) for consistency ---
[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933578 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -183,17 +183,25 @@ trait CommonCorrelate { } private[flink] def correlateOpName( + inputType: RelDataType, rexCall: RexCall, sqlFunction: TableSqlFunction, - rowType: RelDataType) + rowType: RelDataType, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) : String = { -s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" +s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," + + s" select: ${selectToString(rowType)}" } - private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { -val udtfName = sqlFunction.getName -val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") + private[flink] def correlateToString( + inputType: RelDataType, + rexCall: RexCall, + sqlFunction: TableSqlFunction, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { +val inFields = inputType.getFieldNames.asScala.toList +val udtfName = sqlFunction.toString +val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(",") --- End diff -- please add a space: `mkString(",")` -> `mkString(", ")` ---
[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192867#comment-16192867 ] ASF GitHub Bot commented on FLINK-7410: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -183,17 +183,25 @@ trait CommonCorrelate { } private[flink] def correlateOpName( + inputType: RelDataType, rexCall: RexCall, sqlFunction: TableSqlFunction, - rowType: RelDataType) + rowType: RelDataType, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) : String = { -s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" +s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," + + s" select: ${selectToString(rowType)}" } - private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { -val udtfName = sqlFunction.getName -val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") + private[flink] def correlateToString( + inputType: RelDataType, + rexCall: RexCall, + sqlFunction: TableSqlFunction, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { +val inFields = inputType.getFieldNames.asScala.toList +val udtfName = sqlFunction.toString +val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(",") --- End diff -- Please add a space to the `mkString` call in `selectToString()` as well. Thanks > Use toString method to display operator names for UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Use {{toString}} method to display operator names for UserDefinedFunction. > The method will return class name by default. Users can also override the > method to return whatever he wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192865#comment-16192865 ] ASF GitHub Bot commented on FLINK-7410: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933355 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala --- @@ -61,7 +62,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c, '$')"), -term("function", func1.getClass.getCanonicalName), +term("correlate", s"table(func1($$cor0.c,'$$'))"), --- End diff -- change to `term("correlate", s"table(func1($$cor0.c, '$$'))"),` for consistency > Use toString method to display operator names for UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Use {{toString}} method to display operator names for UserDefinedFunction. > The method will return class name by default. Users can also override the > method to return whatever he wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -183,17 +183,25 @@ trait CommonCorrelate { } private[flink] def correlateOpName( + inputType: RelDataType, rexCall: RexCall, sqlFunction: TableSqlFunction, - rowType: RelDataType) + rowType: RelDataType, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) : String = { -s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" +s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," + + s" select: ${selectToString(rowType)}" } - private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { -val udtfName = sqlFunction.getName -val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") + private[flink] def correlateToString( + inputType: RelDataType, + rexCall: RexCall, + sqlFunction: TableSqlFunction, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { +val inFields = inputType.getFieldNames.asScala.toList +val udtfName = sqlFunction.toString +val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(",") --- End diff -- Please add a space to the `mkString` call in `selectToString()` as well. Thanks ---
[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192864#comment-16192864 ] ASF GitHub Bot commented on FLINK-7410: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933262 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala --- @@ -41,7 +41,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c)"), -term("function", func1.getClass.getCanonicalName), +term("correlate", s"table(func1($$cor0.c))"), +term("select", "a,b,c,f0"), --- End diff -- please change to `term("select", "a", "b", "c", "f0"),` (i.e., use separate strings for the field names) for consistency > Use toString method to display operator names for UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Use {{toString}} method to display operator names for UserDefinedFunction. > The method will return class name by default. Users can also override the > method to return whatever he wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4624#discussion_r142933355 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala --- @@ -61,7 +62,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c, '$')"), -term("function", func1.getClass.getCanonicalName), +term("correlate", s"table(func1($$cor0.c,'$$'))"), --- End diff -- change to `term("correlate", s"table(func1($$cor0.c, '$$'))"),` for consistency ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192852#comment-16192852 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142933167 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.messages.RequestBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; + +/** + * Request for submitting a job. + * + * We currently require the job-jars to be uploaded through the blob-server. + */ +public final class JobSubmitRequestBody implements RequestBody { + + private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + + /** +* The serialized job graph. +*/ + @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) + public final byte[] serializedJobGraph; --- End diff -- I know the the HttpObjectAggregator will throw an exception, and i _think_ it will just be logged server-side. We can't really change that except my modifying the aggregator (but i have some WIP to replace it anyway). I'll add the client-side size check. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142933167 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.messages.RequestBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; + +/** + * Request for submitting a job. + * + * We currently require the job-jars to be uploaded through the blob-server. + */ +public final class JobSubmitRequestBody implements RequestBody { + + private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + + /** +* The serialized job graph. +*/ + @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) + public final byte[] serializedJobGraph; --- End diff -- I know the the HttpObjectAggregator will throw an exception, and i _think_ it will just be logged server-side. We can't really change that except my modifying the aggregator (but i have some WIP to replace it anyway). I'll add the client-side size check. ---
[jira] [Updated] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-7765: -- Description: For motivation check https://issues.apache.org/jira/browse/FLINK-7765 (was: For motivation check [#7739]) > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7765 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7765) Enable dependency convergence
Piotr Nowojski created FLINK-7765: - Summary: Enable dependency convergence Key: FLINK-7765 URL: https://issues.apache.org/jira/browse/FLINK-7765 Project: Flink Issue Type: Improvement Components: Build System Reporter: Piotr Nowojski Assignee: Piotr Nowojski For motivation check [#7739] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-7765: -- Description: For motivation check https://issues.apache.org/jira/browse/FLINK-7739 (was: For motivation check https://issues.apache.org/jira/browse/FLINK-7765) > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192841#comment-16192841 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142930472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- well they aren't really internal fields, at least i didn't intend them to be. The client has to resolve the parameters somehow, so we either have to add a custom resolve method to every `MessageParameters` class (which will make for an odd API when creating sub-classes), or provide access to each parameter (either directly or through a getter). I opted for the direct approach since it makes it obvious that we are in fact modifying the `MessageParameters` object. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142930472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- well they aren't really internal fields, at least i didn't intend them to be. The client has to resolve the parameters somehow, so we either have to add a custom resolve method to every `MessageParameters` class (which will make for an odd API when creating sub-classes), or provide access to each parameter (either directly or through a getter). I opted for the direct approach since it makes it obvious that we are in fact modifying the `MessageParameters` object. ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192837#comment-16192837 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142929069 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest extends TestLogger { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { --- End diff -- renamed to testJobSubmitCancelStop. IT verifies that the cliennt sends properly sends out requests to the corresponding handlers. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task >
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142929069 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest extends TestLogger { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { --- End diff -- renamed to testJobSubmitCancelStop. IT verifies that the cliennt sends properly sends out requests to the corresponding handlers. ---
[jira] [Commented] (FLINK-7754) Complete termination future after actor has been stopped.
[ https://issues.apache.org/jira/browse/FLINK-7754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192828#comment-16192828 ] ASF GitHub Bot commented on FLINK-7754: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4770 > Complete termination future after actor has been stopped. > - > > Key: FLINK-7754 > URL: https://issues.apache.org/jira/browse/FLINK-7754 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > At the moment, we complete the termination future when the {{postStop}} > method of the {{RpcActor}} has been executed. This, however, does not mean > that the underlying actor has been stopped. We should rather complete the > future in the {{AkkaRpcService#stopServer}} method where we close the actor > with a graceful shutdown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4770: [FLINK-7754] [rpc] Complete termination future aft...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4770 ---
[jira] [Comment Edited] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
[ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192823#comment-16192823 ] Kostas Kloudas edited comment on FLINK-7549 at 10/5/17 12:53 PM: - Hi [~i...@paolorendano.it], Is this issue still valid, or it is resolved by setting the {{timeCharacteristic}} to event time? was (Author: kkl0u): Hi [~i...@paolorendano.it], Is this issue still valid, or it is resolved by setting the [[timeCharacteristic]] to event time? > CEP - Pattern not discovered if source streaming is very fast > - > > Key: FLINK-7549 > URL: https://issues.apache.org/jira/browse/FLINK-7549 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 >Reporter: Paolo Rendano > > Hi all, > I'm doing some stress test on my pattern using JMeter to populate source data > on a rabbitmq queue. This queue contains status generated by different > devices . In my test case I set to loop on a base of 1000 cycles, each one > sending respectively the first and the second status that generate the event > using flink CEP (status keyed by device). I expect to get an output of 1000 > events. > In my early tests I launched that but I noticed that I get only partial > results in output (70/80% of the expected ones). Introducing a delay in > jmeter plan between the sending of the two status solved the problem. The > minimum delay (of course this is on my local machine, on other machines may > vary) that make things work is 20/25 ms. > My code is structured this way (the following is a semplification): > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setAutoWatermarkInterval(100L); > // source definition > DataStream dataStreamSource = > env.addSource(new > MYRMQAutoboundQueueSource<>(connectionConfig, > conf.getSourceExchange(), > conf.getSourceRoutingKey(), > conf.getSourceQueueName(), > true, > new MyMessageWrapperSchema())) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { > private static final long serialVersionUID = > -1L; > @Override > public long extractTimestamp(MyMessageWrapper > element) { > if > (element.getData().get("stateTimestamp")==null) { > throw new RuntimeException("Status > Timestamp is null during time ordering for device [" + > element.getData().get("deviceCode") + "]"); > } > return > FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); > } > }) > .name("MyIncomingStatus"); > // PATTERN DEFINITION > PatternmyPattern = Pattern > .begin("start") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st", "none")) > .next("end") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st","started")) > .within(Time.minutes(3)); > // CEP DEFINITION > PatternStream myPatternStream = > CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); > DataStream > outputStream = > myPatternStream.flatSelect(patternFlatTimeoutFunction, > patternFlatSelectFunction); > // SINK DEFINITION > outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, > outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); > {code} > digging and logging messages received by flink in "extractTimestamp", what > happens is that with that so high rate of messages, source may receive > messages with the same timestamp but with different deviceCode. > Any idea? > Thanks, regards > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7754) Complete termination future after actor has been stopped.
[ https://issues.apache.org/jira/browse/FLINK-7754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7754. Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 4947ee669fe267e3f71853cd14228b350cf10ac8 > Complete termination future after actor has been stopped. > - > > Key: FLINK-7754 > URL: https://issues.apache.org/jira/browse/FLINK-7754 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > At the moment, we complete the termination future when the {{postStop}} > method of the {{RpcActor}} has been executed. This, however, does not mean > that the underlying actor has been stopped. We should rather complete the > future in the {{AkkaRpcService#stopServer}} method where we close the actor > with a graceful shutdown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7754) Complete termination future after actor has been stopped.
[ https://issues.apache.org/jira/browse/FLINK-7754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192824#comment-16192824 ] ASF GitHub Bot commented on FLINK-7754: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4770 Merging this PR since Travis passed. > Complete termination future after actor has been stopped. > - > > Key: FLINK-7754 > URL: https://issues.apache.org/jira/browse/FLINK-7754 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > At the moment, we complete the termination future when the {{postStop}} > method of the {{RpcActor}} has been executed. This, however, does not mean > that the underlying actor has been stopped. We should rather complete the > future in the {{AkkaRpcService#stopServer}} method where we close the actor > with a graceful shutdown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4770: [FLINK-7754] [rpc] Complete termination future after acto...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4770 Merging this PR since Travis passed. ---
[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast
[ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192823#comment-16192823 ] Kostas Kloudas commented on FLINK-7549: --- Hi [~i...@paolorendano.it], Is this issue still valid, or it is resolved by setting the [[timeCharacteristic]] to event time? > CEP - Pattern not discovered if source streaming is very fast > - > > Key: FLINK-7549 > URL: https://issues.apache.org/jira/browse/FLINK-7549 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 >Reporter: Paolo Rendano > > Hi all, > I'm doing some stress test on my pattern using JMeter to populate source data > on a rabbitmq queue. This queue contains status generated by different > devices . In my test case I set to loop on a base of 1000 cycles, each one > sending respectively the first and the second status that generate the event > using flink CEP (status keyed by device). I expect to get an output of 1000 > events. > In my early tests I launched that but I noticed that I get only partial > results in output (70/80% of the expected ones). Introducing a delay in > jmeter plan between the sending of the two status solved the problem. The > minimum delay (of course this is on my local machine, on other machines may > vary) that make things work is 20/25 ms. > My code is structured this way (the following is a semplification): > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setAutoWatermarkInterval(100L); > // source definition > DataStream dataStreamSource = > env.addSource(new > MYRMQAutoboundQueueSource<>(connectionConfig, > conf.getSourceExchange(), > conf.getSourceRoutingKey(), > conf.getSourceQueueName(), > true, > new MyMessageWrapperSchema())) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) { > private static final long serialVersionUID = > -1L; > @Override > public long extractTimestamp(MyMessageWrapper > element) { > if > (element.getData().get("stateTimestamp")==null) { > throw new RuntimeException("Status > Timestamp is null during time ordering for device [" + > element.getData().get("deviceCode") + "]"); > } > return > FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime(); > } > }) > .name("MyIncomingStatus"); > // PATTERN DEFINITION > PatternmyPattern = Pattern > .begin("start") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st", "none")) > .next("end") > .subtype(MyMessageWrapper.class) > .where(whereEquals("st","started")) > .within(Time.minutes(3)); > // CEP DEFINITION > PatternStream myPatternStream = > CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern); > DataStream > outputStream = > myPatternStream.flatSelect(patternFlatTimeoutFunction, > patternFlatSelectFunction); > // SINK DEFINITION > outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, > outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent"); > {code} > digging and logging messages received by flink in "extractTimestamp", what > happens is that with that so high rate of messages, source may receive > messages with the same timestamp but with different deviceCode. > Any idea? > Thanks, regards > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192819#comment-16192819 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4358 Thanks a lot for your work and patience with me @NicoK. Changes look good to me. I've rebased it onto the latest master and once Travis gives green light, I'll merge it :-) > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192818#comment-16192818 ] Kostas Kloudas commented on FLINK-7606: --- Hi [~i...@paolorendano.it], Sorry for the late reply. So, if I understand correctly, in a nutshell: 1) for event time, there is no memory leak problem (there is also a pending PR that probably fixes the problem also for processing time by unifying the code paths for both notions of time), but 2) at the end of you input, the watermark does not advance and the last batch of events is not processed as it is waiting for the watermark +10 sec to trigger the computation, right? In this case, if you know that your stream is finite, then you can close your source (call close() on your source) and this will send a watermark Long.MAX_VALUE that will flush the buffered elements. Kostas > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, > Schermata 2017-09-27 alle 00.35.53.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4358 Thanks a lot for your work and patience with me @NicoK. Changes look good to me. I've rebased it onto the latest master and once Travis gives green light, I'll merge it :-) ---
[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192805#comment-16192805 ] ASF GitHub Bot commented on FLINK-5005: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3703 @DieBauer Do you still wan't to work on this? I also started trying to make Flink ready for 2.12 before I noticed this older branch. I'd be very happy to stop, though, if you're interested in bringing this to an end. It should be easier now that we dropped Java 8 support and also agreed to drop Scala 2.10 support. > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts >Assignee: Aljoscha Krettek > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3703 @DieBauer Do you still wan't to work on this? I also started trying to make Flink ready for 2.12 before I noticed this older branch. I'd be very happy to stop, though, if you're interested in bringing this to an end. It should be easier now that we dropped Java 8 support and also agreed to drop Scala 2.10 support. ---
[jira] [Created] (FLINK-7764) FlinkKafkaProducer010 does not accept name, uid, or parallelism
Fabian Hueske created FLINK-7764: Summary: FlinkKafkaProducer010 does not accept name, uid, or parallelism Key: FLINK-7764 URL: https://issues.apache.org/jira/browse/FLINK-7764 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.3.2, 1.4.0 Reporter: Fabian Hueske As [reported on the user list|https://lists.apache.org/thread.html/1e97b79cb5611a942beb609a61b5fba6d62a49c9c86336718a9a0004@%3Cuser.flink.apache.org%3E]: When I try to use KafkaProducer with timestamps it fails to set name, uid or parallelism. It uses default values. {code} FlinkKafkaProducer010.FlinkKafkaProducer010Configuration producer = FlinkKafkaProducer010 .writeToKafkaWithTimestamps(stream, topicName, schema, props, partitioner); producer.setFlushOnCheckpoint(flushOnCheckpoint); producer.name("foo") .uid("bar") .setParallelism(5); return producer; {code} As operator name it shows "FlinKafkaProducer 0.10.x” with the typo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7689) Instrument the Flink JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-7689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192793#comment-16192793 ] ASF GitHub Bot commented on FLINK-7689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r142918146 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -41,6 +46,11 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final String FLUSH_SCOPE = "flush"; + static final String FLUSH_RATE_METER_NAME = "rate"; + static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = "rateGreaterThanBatchInterval"; --- End diff -- rename to `batchLimitReachedRate`? > Instrument the Flink JDBC sink > -- > > Key: FLINK-7689 > URL: https://issues.apache.org/jira/browse/FLINK-7689 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.4.0 >Reporter: Martin Eden >Priority: Minor > Labels: jdbc, metrics > Fix For: 1.4.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > As confirmed by the Flink community in the following mailing list > [message|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/metrics-for-Flink-sinks-td15200.html] > using off the shelf Flink sinks like the JDBC sink, Redis sink or Cassandra > sink etc does not expose any sink specific metrics. > The purpose of this ticket is to add some relevant metrics to the > JDBCOutputFormat: > - Meters for when a flush is made. > - Histograms for the jdbc batch count and batch execution latency. > These would allow deeper understanding of the runtime behaviour of > performance critical jobs writing to external databases using this generic > interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7689) Instrument the Flink JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-7689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192792#comment-16192792 ] ASF GitHub Bot commented on FLINK-7689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r142919779 --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java --- @@ -233,6 +255,30 @@ public void testFlush() throws SQLException, IOException { } } + @Test + public void testMetricsSetup() throws IOException { --- End diff -- Can you extend this test to check that the metrics are correctly set (except for the `durationMs` histogram)? > Instrument the Flink JDBC sink > -- > > Key: FLINK-7689 > URL: https://issues.apache.org/jira/browse/FLINK-7689 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.4.0 >Reporter: Martin Eden >Priority: Minor > Labels: jdbc, metrics > Fix For: 1.4.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > As confirmed by the Flink community in the following mailing list > [message|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/metrics-for-Flink-sinks-td15200.html] > using off the shelf Flink sinks like the JDBC sink, Redis sink or Cassandra > sink etc does not expose any sink specific metrics. > The purpose of this ticket is to add some relevant metrics to the > JDBCOutputFormat: > - Meters for when a flush is made. > - Histograms for the jdbc batch count and batch execution latency. > These would allow deeper understanding of the runtime behaviour of > performance critical jobs writing to external databases using this generic > interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r142918146 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -41,6 +46,11 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final String FLUSH_SCOPE = "flush"; + static final String FLUSH_RATE_METER_NAME = "rate"; + static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = "rateGreaterThanBatchInterval"; --- End diff -- rename to `batchLimitReachedRate`? ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r142919779 --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java --- @@ -233,6 +255,30 @@ public void testFlush() throws SQLException, IOException { } } + @Test + public void testMetricsSetup() throws IOException { --- End diff -- Can you extend this test to check that the metrics are correctly set (except for the `durationMs` histogram)? ---
[GitHub] flink issue #4774: [FLINK-6495] Fix Akka's default value for heartbeat pause
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4774 I have added runtime check for that. To be clear, this was not the reason for Kafka tests instabilities and I'm not aware if this was causing any issues. But it definitely could and should be fixed anyway (IMO that should be a release blocker) ---
[jira] [Commented] (FLINK-6495) Migrate Akka configuration options
[ https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192786#comment-16192786 ] ASF GitHub Bot commented on FLINK-6495: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4774 I have added runtime check for that. To be clear, this was not the reason for Kafka tests instabilities and I'm not aware if this was causing any issues. But it definitely could and should be fixed anyway (IMO that should be a release blocker) > Migrate Akka configuration options > -- > > Key: FLINK-6495 > URL: https://issues.apache.org/jira/browse/FLINK-6495 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Reporter: Chesnay Schepler >Assignee: Fang Yong > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7729) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-7729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7729. --- Resolution: Duplicate > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-7729 > URL: https://issues.apache.org/jira/browse/FLINK-7729 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > As per the discussion on the ML: > - > https://lists.apache.org/thread.html/d7b03b4697f91efc162d5febb772d19d324d3d6daa7e38f6fa811e30@%3Cdev.flink.apache.org%3E > - > https://lists.apache.org/thread.html/67d7ea9964190b1e1c472a53903c55a3c5cf070bde82a27997226b8c@%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-5005: --- Assignee: Aljoscha Krettek > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts >Assignee: Aljoscha Krettek > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5005: Summary: Remove Scala 2.10 support; add Scala 2.12 support (was: Publish Scala 2.12 artifacts) > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192776#comment-16192776 ] Aljoscha Krettek commented on FLINK-5005: - The ML discussions on this concluded on removing 2.10 support and adding 2.12 support: - https://lists.apache.org/thread.html/d7b03b4697f91efc162d5febb772d19d324d3d6daa7e38f6fa811e30@%3Cdev.flink.apache.org%3E - https://lists.apache.org/thread.html/67d7ea9964190b1e1c472a53903c55a3c5cf070bde82a27997226b8c@%3Cdev.flink.apache.org%3E > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6045) FLINK_CONF_DIR has to be set even though specifying --configDir
[ https://issues.apache.org/jira/browse/FLINK-6045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192773#comment-16192773 ] Aljoscha Krettek commented on FLINK-6045: - We should probably remove use of {{FLINK_CONF_DIR}} anywhere in the actual code and only use it in scripts that use it to set {{--configDir}}. This also entails removing {{GlobalConfiguration.loadConfiguration()}} (the call without arguments). > FLINK_CONF_DIR has to be set even though specifying --configDir > --- > > Key: FLINK-6045 > URL: https://issues.apache.org/jira/browse/FLINK-6045 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Till Rohrmann >Priority: Minor > > A user reported that {{FLINK_CONF_DIR}} has to be set in addition to > specifying --configDir. Otherwise the {{JobManager}} and the {{TaskManagers}} > fail silently trying to read from {{fs.hdfs.hadoopconf}}. Specifying one of > the two configuration options should be enough to successfully run Flink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4763 Alright @zentol. I guess it would work if I signed up for reviewable. ---
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192763#comment-16192763 ] ASF GitHub Bot commented on FLINK-7709: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4763 Alright @zentol. I guess it would work if I signed up for reviewable. > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7709 > URL: https://issues.apache.org/jira/browse/FLINK-7709 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4763 @tillrohrmann I wanted to try it out, primarily since i can mark individual files as reviewed. For the remaining files I will once again write the comments on github. ---
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192755#comment-16192755 ] ASF GitHub Bot commented on FLINK-7709: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4763 @tillrohrmann I wanted to try it out, primarily since i can mark individual files as reviewed. For the remaining files I will once again write the comments on github. > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7709 > URL: https://issues.apache.org/jira/browse/FLINK-7709 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192742#comment-16192742 ] ASF GitHub Bot commented on FLINK-7072: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142897388 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import javax.annotation.Nullable; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private final RestClusterClientConfiguration restClusterClientConfiguration; + private final RestClient restClient; + private final ExecutorService executorService = Executors.newFixedThreadPool(4); + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.restClusterClientConfiguration = configuration; + this.restClient = new RestClient(configuration.getRestEndpointConfiguration(), executorService); + } + + @Override + public void shutdown() { + try { + // we only call this for legacy reasons to shutdown components that are started in the ClusterClient constructor + super.shutdown(); + } catch (Exception e) { + log.error("An error occurred during the client shutdown.", e); + } + this.restClient.shutdown(Time.seconds(5)); + this.executorService.shutdown(); --- End diff -- Better to use `Executors.gracefulShutdown` here. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink >
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192739#comment-16192739 ] ASF GitHub Bot commented on FLINK-7072: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142899909 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest extends TestLogger { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { --- End diff -- What does test `ABC` test? > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192741#comment-16192741 ] ASF GitHub Bot commented on FLINK-7072: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142907629 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- Hmm what about giving these values to the `JobTerminationMessageParameters` constructor? It feels not entirely right that we access internal fields to resolve them? > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192740#comment-16192740 ] ASF GitHub Bot commented on FLINK-7072: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142900393 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest extends TestLogger { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192743#comment-16192743 ] ASF GitHub Bot commented on FLINK-7072: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142897184 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -610,25 +633,15 @@ public void cancel(JobID jobId) throws Exception { * failed. That might be due to an I/O problem, ie, the job-manager is unreachable. */ public void stop(final JobID jobId) throws Exception { - final ActorGateway jobManagerGateway = getJobManagerGateway(); + final ActorGateway jobManager = getJobManagerGateway(); - final Future response; - try { - response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout); - } catch (final Exception e) { - throw new ProgramInvocationException("Failed to query the job manager gateway.", e); - } + Future response = jobManager.ask(new JobManagerMessages.StopJob(jobId), timeout); - final Object result = Await.result(response, timeout); + final Object rc = Await.result(response, timeout); - if (result instanceof JobManagerMessages.StoppingSuccess) { - log.info("Job stopping with ID " + jobId + " succeeded."); - } else if (result instanceof JobManagerMessages.StoppingFailure) { - final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause(); - log.info("Job stopping with ID " + jobId + " failed.", t); - throw new Exception("Failed to stop the job because of \n" + t.getMessage()); - } else { - throw new Exception("Unknown message received while stopping: " + result.getClass().getName()); + if (rc instanceof JobManagerMessages.StoppingFailure) { + throw new Exception("Stopping the job with ID " + jobId + " failed.", + ((JobManagerMessages.StoppingFailure) rc).cause()); --- End diff -- The unknown response type exception was lost > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142899120 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import javax.annotation.Nullable; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private final RestClusterClientConfiguration restClusterClientConfiguration; + private final RestClient restClient; + private final ExecutorService executorService = Executors.newFixedThreadPool(4); --- End diff -- Can we give these threads a proper name? Something like "RestClusterClientIOThread". ---