[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194056#comment-16194056
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user PangZhi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r143098834
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
}
 
@Test
+   public void testCassandraTableSink() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataStreamSource source = 
env.fromCollection(rowCollection);
+   CassandraTableSink cassandraTableSink = new 
CassandraTableSink(new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT)).build();
+   }
+   }, injectTableName(INSERT_DATA_QUERY), new Properties());
+   CassandraTableSink newCassandrTableSink = 
cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+
+   newCassandrTableSink.emitDataStream(source);
+
+   env.execute();
+   ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+   Assert.assertEquals(20, rs.all().size());
+   }
+
+   @Test
+   public void testCassandraTableSinkE2E() throws Exception {
--- End diff --

this is added as another reviewer think better to add e2e test using sql 
api.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-10-05 Thread PangZhi
Github user PangZhi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r143098834
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
}
 
@Test
+   public void testCassandraTableSink() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataStreamSource source = 
env.fromCollection(rowCollection);
+   CassandraTableSink cassandraTableSink = new 
CassandraTableSink(new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT)).build();
+   }
+   }, injectTableName(INSERT_DATA_QUERY), new Properties());
+   CassandraTableSink newCassandrTableSink = 
cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+
+   newCassandrTableSink.emitDataStream(source);
+
+   env.execute();
+   ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+   Assert.assertEquals(20, rs.all().size());
+   }
+
+   @Test
+   public void testCassandraTableSinkE2E() throws Exception {
--- End diff --

this is added as another reviewer think better to add e2e test using sql 
api.


---


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193795#comment-16193795
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/4729

[FLINK-7076] [ResourceManager] implement YARN stopWorker logic

## What is the purpose of the change

*Implement stopWorker logic for YarnResourceManager*


## Brief change log

  - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId 
mappings*
  - *Implement the stopWorker logic for YARN*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink implement-stopWorker-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4729






> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-10-05 Thread suez1224
GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/4729

[FLINK-7076] [ResourceManager] implement YARN stopWorker logic

## What is the purpose of the change

*Implement stopWorker logic for YarnResourceManager*


## Brief change log

  - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId 
mappings*
  - *Implement the stopWorker logic for YARN*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink implement-stopWorker-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4729






---


[jira] [Commented] (FLINK-2961) Add support for basic type Date in Table API

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193603#comment-16193603
 ] 

ASF GitHub Bot commented on FLINK-2961:
---

Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/1322
  

[![Coverage 
Status](https://coveralls.io/builds/13591448/badge)](https://coveralls.io/builds/13591448)

Changes Unknown when pulling **2544011784017fc12234fe38ebf6b3c58b84 on 
twalthr:TableApiDate** into ** on apache:master**.



> Add support for basic type Date in Table API
> 
>
> Key: FLINK-2961
> URL: https://issues.apache.org/jira/browse/FLINK-2961
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Minor
> Fix For: 1.0.0
>
>
> Currently, the basic type {{Date}} is not implemented in the Table API. In 
> order to have a mapping of the most important ANSI SQL types for FLINK-2099. 
> It makes sense to add support for {{Date}} to represent date, time and 
> timestamps of milliseconds precision.
> Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A 
> `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted 
> to `DATE` must have the format "`-MM-dd HH:mm:ss.SSS`", "`-MM-dd`", 
> "`HH:mm:ss`", or a milliseconds timestamp. All timestamps refer to the UTC 
> timezone beginning from January 1, 1970, 00:00:00 in milliseconds.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #1322: [FLINK-2961] [table] Add support for basic type Date in T...

2017-10-05 Thread coveralls
Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/1322
  

[![Coverage 
Status](https://coveralls.io/builds/13591448/badge)](https://coveralls.io/builds/13591448)

Changes Unknown when pulling **2544011784017fc12234fe38ebf6b3c58b84 on 
twalthr:TableApiDate** into ** on apache:master**.



---


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193570#comment-16193570
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/4729


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-10-05 Thread suez1224
Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/4729


---


[jira] [Commented] (FLINK-2973) Add flink-benchmark with compliant licenses again

2017-10-05 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193521#comment-16193521
 ] 

Greg Hogan commented on FLINK-2973:
---

[~fhueske] [~rmetzger] can we not include this as an optional (or unlisted) 
module in the same manner as {{flink-connector-kinesis}}? Both are restricted 
by dependence on a Category X license (GPL and ASL, respectively).

> Add flink-benchmark with compliant licenses again
> -
>
> Key: FLINK-2973
> URL: https://issues.apache.org/jira/browse/FLINK-2973
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.0.0
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Minor
> Fix For: 1.0.0
>
>
> We recently created the Maven module {{flink-benchmark}} for micro-benchmarks 
> and ported most of the existing micro-benchmarks to the Java benchmarking 
> framework JMH. However, JMH is part of OpenJDK and under GPL license which is 
> not compatible with the AL2.
> Consequently, we need to remove this dependency and either revert the porting 
> commits or port the benchmarks to another benchmarking framework. An 
> alternative could be [Google's Caliper|https://github.com/google/caliper] 
> library which is under AL2.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4760: [hotfix][docs] Polish docs index page for consistency in ...

2017-10-05 Thread ChrisChinchilla
Github user ChrisChinchilla commented on the issue:

https://github.com/apache/flink/pull/4760
  
@alpinegizmo No worries, done!


---


[jira] [Created] (FLINK-7768) Load File Systems via Java Service abstraction

2017-10-05 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7768:
---

 Summary: Load File Systems via Java Service abstraction
 Key: FLINK-7768
 URL: https://issues.apache.org/jira/browse/FLINK-7768
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Stephan Ewen
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7767) Avoid loading Hadoop conf dynamically at runtime

2017-10-05 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7767:
---

 Summary: Avoid loading Hadoop conf dynamically at runtime
 Key: FLINK-7767
 URL: https://issues.apache.org/jira/browse/FLINK-7767
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.0


The bucketing sink dynamically loads the Hadoop configuration in various places.

The result of that configuration is not always predictable, as it tries to 
automagically discover the Hadoop config files.

A better approach is to rely on the Flink configuration to find the Hadoop 
configuration, or to directly use the Hadoop configuration used by the Hadoop 
file systems.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7766) Remove obsolete reflection for hflush on HDFS

2017-10-05 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7766:
---

 Summary: Remove obsolete reflection for hflush on HDFS
 Key: FLINK-7766
 URL: https://issues.apache.org/jira/browse/FLINK-7766
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.0


This code originally existed for compatibility with Hadoop 1.

Since Hadoop 1 support is dropped, this is no longer necessary.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7763) TableSinkITCase fails with "object reuse" enabled

2017-10-05 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193234#comment-16193234
 ] 

Stephan Ewen commented on FLINK-7763:
-

It would be great to change that.

We are very much looking to avoid additional copies when passing record between 
chained functions, because this costs unnecessary performance. But all 
operators need to be written to not mutate incoming objects to make that 
possible.

> TableSinkITCase fails with "object reuse" enabled
> -
>
> Key: FLINK-7763
> URL: https://issues.apache.org/jira/browse/FLINK-7763
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Set {{objectReuse}} to {{true}} in {{ExecutionConfig}} to reproduce the 
> failing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-10-05 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193229#comment-16193229
 ] 

Stephan Ewen commented on FLINK-5372:
-

Could it be related to FLINK-7757 ?

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-05 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193225#comment-16193225
 ] 

Stephan Ewen commented on FLINK-7608:
-

One other aspect to consider is the cost of histograms.

The latency sample rate cannot be too fast when using complex histograms, 
otherwise it interferes with the execution.

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7643) Configure FileSystems only once

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193174#comment-16193174
 ] 

ASF GitHub Bot commented on FLINK-7643:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989725
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Flink runtime 
classes missing in classpath/dependencies.");
+   }
+   catch (Exception | LinkageError e) {
+   return new UnsupportedSchemeFactory("Flink's Hadoop 
file system factory could not be loaded", e);
+   }
+
+   // check (for eager and better exception messages) if the 
Hadoop classes are available here
+   try {
+   Class.forName(HADOOP_CONFIG_CLASS, false, cl);
+   Class.forName(HADOOP_FS_CLASS, false, cl);
+   }
+   catch (ClassNotFoundException e) {
--- End diff --

ditto


> Configure FileSystems only once
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.
> Instead, file systems should be configured once upon process startup, when 
> the configuration is loaded.
> This will also increase efficiency of checkpoints, as it avoids redundant 
> parsing for each data chunk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7643) Configure FileSystems only once

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193176#comment-16193176
 ] 

ASF GitHub Bot commented on FLINK-7643:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989384
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
--- End diff --

Shall we log a warning for this exception? I'd prefer to have a log entry 
at the very top of log file to be easily discovered


> Configure FileSystems only once
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.
> Instead, file systems should be configured once upon process startup, when 
> the configuration is loaded.
> This will also increase efficiency of checkpoints, as it avoids redundant 
> parsing for each data chunk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7643) Configure FileSystems only once

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193177#comment-16193177
 ] 

ASF GitHub Bot commented on FLINK-7643:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989807
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Flink runtime 
classes missing in classpath/dependencies.");
+   }
+   catch (Exception | LinkageError e) {
+   return new UnsupportedSchemeFactory("Flink's Hadoop 
file system factory could not be loaded", e);
+   }
+
+   // check (for eager and better exception messages) if the 
Hadoop classes are available here
+   try {
+   Class.forName(HADOOP_CONFIG_CLASS, false, cl);
+   Class.forName(HADOOP_FS_CLASS, false, cl);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Hadoop is not in 
the classpath/dependencies.");
+   }
+
+   // Create the factory.
+   try {
+   return factoryClass.newInstance();
+   }
+   catch (Exception | LinkageError e) {
--- End diff --

ditto


> Configure FileSystems only once
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.
> Instead, file systems should be configured once upon process startup, when 
> the configuration is loaded.
> This will also increase efficiency of checkpoints, as it avoids redundant 
> parsing for each data chunk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7643) Configure FileSystems only once

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193175#comment-16193175
 ] 

ASF GitHub Bot commented on FLINK-7643:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989415
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Flink runtime 
classes missing in classpath/dependencies.");
+   }
+   catch (Exception | LinkageError e) {
--- End diff --

ditto


> Configure FileSystems only once
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.
> Instead, file systems should be configured once upon process startup, when 
> the configuration is loaded.
> This will also increase efficiency of checkpoints, as it avoids redundant 
> parsing for each data chunk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...

2017-10-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989415
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Flink runtime 
classes missing in classpath/dependencies.");
+   }
+   catch (Exception | LinkageError e) {
--- End diff --

ditto


---


[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...

2017-10-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989725
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Flink runtime 
classes missing in classpath/dependencies.");
+   }
+   catch (Exception | LinkageError e) {
+   return new UnsupportedSchemeFactory("Flink's Hadoop 
file system factory could not be loaded", e);
+   }
+
+   // check (for eager and better exception messages) if the 
Hadoop classes are available here
+   try {
+   Class.forName(HADOOP_CONFIG_CLASS, false, cl);
+   Class.forName(HADOOP_FS_CLASS, false, cl);
+   }
+   catch (ClassNotFoundException e) {
--- End diff --

ditto


---


[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...

2017-10-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989807
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Flink runtime 
classes missing in classpath/dependencies.");
+   }
+   catch (Exception | LinkageError e) {
+   return new UnsupportedSchemeFactory("Flink's Hadoop 
file system factory could not be loaded", e);
+   }
+
+   // check (for eager and better exception messages) if the 
Hadoop classes are available here
+   try {
+   Class.forName(HADOOP_CONFIG_CLASS, false, cl);
+   Class.forName(HADOOP_FS_CLASS, false, cl);
+   }
+   catch (ClassNotFoundException e) {
+   return new UnsupportedSchemeFactory("Hadoop is not in 
the classpath/dependencies.");
+   }
+
+   // Create the factory.
+   try {
+   return factoryClass.newInstance();
+   }
+   catch (Exception | LinkageError e) {
--- End diff --

ditto


---


[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...

2017-10-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4776#discussion_r142989384
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.factories;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/**
+ * A
+ */
+public class HadoopFileSystemFactoryLoader {
+
+   private static final String FACTORY_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
+
+   private static final String HADOOP_CONFIG_CLASS = 
"org.apache.hadoop.conf.Configuration";
+
+   private static final String HADOOP_FS_CLASS = 
"org.apache.hadoop.fs.FileSystem";
+
+
+   public static FileSystemFactory loadFactory() {
+   final ClassLoader cl = 
HadoopFileSystemFactoryLoader.class.getClassLoader();
+
+   // first, see if the Flink runtime classes are available
+   final Class factoryClass;
+   try {
+   factoryClass = Class.forName(FACTORY_CLASS, false, 
cl).asSubclass(FileSystemFactory.class);
+   }
+   catch (ClassNotFoundException e) {
--- End diff --

Shall we log a warning for this exception? I'd prefer to have a log entry 
at the very top of log file to be easily discovered


---


[jira] [Commented] (FLINK-7643) Configure FileSystems only once

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193138#comment-16193138
 ] 

ASF GitHub Bot commented on FLINK-7643:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4776
  
ok, sounds good. That also explains some questions I commented, so I 
removed them.


> Configure FileSystems only once
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.
> Instead, file systems should be configured once upon process startup, when 
> the configuration is loaded.
> This will also increase efficiency of checkpoints, as it avoids redundant 
> parsing for each data chunk.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4776: [FLINK-7643] [core] Rework FileSystem loading to use fact...

2017-10-05 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4776
  
ok, sounds good. That also explains some questions I commented, so I 
removed them.


---


[jira] [Commented] (FLINK-7292) Fix EMPTY MATCH bug in CEP.

2017-10-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193122#comment-16193122
 ] 

Kostas Kloudas commented on FLINK-7292:
---

Hi [~litrain_1],

I believe that for infinite streams, an empty result does not make much sense. 
Conceptually, the empty result means that "there is no match in your input". If 
the input in infinite, this becomes "there is no match in your input *so far* " 
and given that this "so far" is not defined, it would mean that we have 
infinite "no matches" between potential matches. Even if we integrated it, this 
would imply a waste of resources, as we would have to iterate over all keys, 
and for each one emit an "empty match" element.

Given the above, for the sake of having a clean JIRA and given that this 
discussion seems stale, I would recommend closing this issue. 

If nobody objects till Monday, I will close it. 
Please let me know if you disagree.

> Fix EMPTY MATCH bug in CEP.
> ---
>
> Key: FLINK-7292
> URL: https://issues.apache.org/jira/browse/FLINK-7292
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: zhangxiaoyu
>
> Currently, with the pattern {quote}a? {quote}and the event{quote} a1{quote}, 
> the result pattern is only{quote} a1{quote}without the empty match.
> We wish the empty matched is also returned. And I am working on this issue 
> now.
> My method is  checking if there exists empty match only when the the first 
> event comes(at the StartState) ——try to traverse the PROCEED edges with the 
> trueFunction condition from the StartState, see if it can arrive FinalState, 
> if so, add an empty list to the result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026
 ] 

Paolo Rendano edited comment on FLINK-7606 at 10/5/17 3:39 PM:
---

IMO the timeout should be triggered after the expiration of the time window (in 
the example 5 minutes) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?


was (Author: i...@paolorendano.it):
IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 minutes) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026
 ] 

Paolo Rendano edited comment on FLINK-7606 at 10/5/17 3:38 PM:
---

IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 minutes) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?


was (Author: i...@paolorendano.it):
IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 seconds) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193026#comment-16193026
 ] 

Paolo Rendano commented on FLINK-7606:
--

IMO the timeout should be triggered after the expiration of the time window (in 
the example 10 seconds) without any new elements (i.e. if the last received 
element is out of that time window). With this kind of strategy you would get 
at most a latency of the time window interval. Would it solve?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193005#comment-16193005
 ] 

Kostas Kloudas commented on FLINK-7549:
---

Thanks for closing the issue [~i...@paolorendano.it] !

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4777: Convergence

2017-10-05 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4777

Convergence

## What is the purpose of the change

This pull request enables dependency convergence for couple of modules. 
Enabling it for all of them at once is unfortunately too complicated.

## Brief change log

Check commit messages

## Verifying this change

This change **SHOULD** be already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink convergence

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4777.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4777


commit c7cc24d062aa233d86b68b7438c9a4e717003393
Author: Piotr Nowojski 
Date:   2017-09-29T16:23:29Z

[FLINK-7739][kafka-tests] Set shorter heartbeats intervals

Default pause value of 60seconds is too large (tests would timeout before 
akka react)

commit 1677791f10153b9f7ecd552eac148d6ae3d056f1
Author: Piotr Nowojski 
Date:   2017-10-04T11:48:11Z

[FLINK-7739][kafka-tests] Set restart delay to non zero

Give TaskManagers some time to clean up before restaring a job.

commit 937c3fb388d9d7104b6336f59c3674bb70bfbf50
Author: Piotr Nowojski 
Date:   2017-10-04T14:50:57Z

[FLINK-7739] Exclude netty dependency from zookeeper

Zookeeper was pulling in conflicting Netty version. Conflict was
extremly subtle - TaskManager in kafka tests was deadlocking in some
rare corner cases.

commit 47a9d3801de899b74adfed5338df8935b2509b05
Author: Piotr Nowojski 
Date:   2017-10-05T13:17:13Z

[hotfix][build] Add maven-enforcer version property

commit 4a9c6fa8957d4725ace40ac92a9a9d8d5cd6523f
Author: Piotr Nowojski 
Date:   2017-10-04T15:45:58Z

[FLINK-7765][annotations] Enable dependency convergence in flink-annotations

commit 07cceb6fdefe27ec9d474e563d653fbac27988b1
Author: Piotr Nowojski 
Date:   2017-10-05T13:26:48Z

[FLINK-7765][hadoop2] Enable dependency convergence in flink-shaded-hadoop2

commit 0f988b773b64cd57140e3d56b12802161d1f635a
Author: Piotr Nowojski 
Date:   2017-10-05T14:36:02Z

[FLINK-7765][curator] Enable dependency convergence in flink-shaded-curator




---


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192994#comment-16192994
 ] 

Kostas Kloudas commented on FLINK-7606:
---

You are right! 

There is already a discussion about introducing something like an "idle" 
watermark, that simply advances time for inactive partitions. This will help in 
some similar usecases.

Now for your case specifically, it could work only if you know that no more 
elements belonging to a specific period (in event time) will come. Because if 
such elements arrive, then your previous results will be incorrect, right? This 
is the reason why we have not introduced such a mechanism.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-10-05 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192961#comment-16192961
 ] 

Chesnay Schepler commented on FLINK-7765:
-

This is a duplicate of FLINK-4034.

> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192953#comment-16192953
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4742
  
@tillrohrmann I've addressed the second round of comments.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/job/canc...

2017-10-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4742
  
@tillrohrmann I've addressed the second round of comments.


---


[jira] [Commented] (FLINK-7707) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192928#comment-16192928
 ] 

ASF GitHub Bot commented on FLINK-7707:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4772#discussion_r142949904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * REST handler which serves checkpoint statistics for subtasks.
+ */
+public class TaskCheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler {
+
+   public TaskCheckpointStatisticDetailsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders 
messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   CheckpointStatsCache checkpointStatsCache) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+   }
+
+   @Override
+   protected TaskCheckpointStatisticsWithSubtaskDetails 
handleCheckpointRequest(
+   HandlerRequest request,
+   AbstractCheckpointStats checkpointStats) throws 
RestHandlerException {
+
+   final JobVertexID jobVertexId = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   final TaskStateStats taskStatistics = 
checkpointStats.getTaskStateStats(jobVertexId);
+
+   if (taskStatistics != null) {
+
+   final 
TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
+   taskStatistics.getSummaryStats(),
+   checkpointStats.getTriggerTimestamp());
+
+   

[GitHub] flink pull request #4772: [FLINK-7707] [flip6] Add TaskCheckpointStatisticDe...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4772#discussion_r142949904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * REST handler which serves checkpoint statistics for subtasks.
+ */
+public class TaskCheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler {
+
+   public TaskCheckpointStatisticDetailsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders 
messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   CheckpointStatsCache checkpointStatsCache) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+   }
+
+   @Override
+   protected TaskCheckpointStatisticsWithSubtaskDetails 
handleCheckpointRequest(
+   HandlerRequest request,
+   AbstractCheckpointStats checkpointStats) throws 
RestHandlerException {
+
+   final JobVertexID jobVertexId = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   final TaskStateStats taskStatistics = 
checkpointStats.getTaskStateStats(jobVertexId);
+
+   if (taskStatistics != null) {
+
+   final 
TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
+   taskStatistics.getSummaryStats(),
+   checkpointStats.getTriggerTimestamp());
+
+   final List 
subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
+   taskStatistics.getSubtaskStats(),
+   checkpointStats.getTriggerTimestamp());
+
+ 

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192920#comment-16192920
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4358


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-10-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4358


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142943570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

we could also add a dedicated resolve method for each parameter that a 
particular class defines, but at that point we're duplicating the individual 
parameter methods.


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192903#comment-16192903
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142943570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

we could also add a dedicated resolve method for each parameter that a 
particular class defines, but at that point we're duplicating the individual 
parameter methods.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano resolved FLINK-7549.
--
Resolution: Fixed

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192891#comment-16192891
 ] 

Paolo Rendano commented on FLINK-7606:
--

HI [~kkl0u],
1) exactly
2) exactly, but my stream is a never ending stream, so I cannot close. The idea 
could be, why do not flush automatically the buffer not only on a memory use 
base but also after a configurable timeout? Could be a good enhancement to 
implement for different use cases.

Paolo

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192886#comment-16192886
 ] 

Paolo Rendano commented on FLINK-7549:
--

Hi [~kkl0u] yes, I solved setting:

{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}

Thanks

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4381: [FLINK-7196][blob] add a TTL to all transient BLOBs

2017-10-05 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4381
  
rebased onto the latest #4359 


---


[jira] [Commented] (FLINK-7262) remove unused FallbackLibraryCacheManager

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192882#comment-16192882
 ] 

ASF GitHub Bot commented on FLINK-7262:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4403
  
rebased onto the latest #4381 


> remove unused FallbackLibraryCacheManager
> -
>
> Key: FLINK-7262
> URL: https://issues.apache.org/jira/browse/FLINK-7262
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{FallbackLibraryCacheManager}} is basically only used in unit tests nowadays 
> and should probably be removed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4403: [FLINK-7262][blob] remove the unused FallbackLibraryCache...

2017-10-05 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4403
  
rebased onto the latest #4381 


---


[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192866#comment-16192866
 ] 

ASF GitHub Bot commented on FLINK-7410:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933578
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -183,17 +183,25 @@ trait CommonCorrelate {
   }
 
   private[flink] def correlateOpName(
+  inputType: RelDataType,
   rexCall: RexCall,
   sqlFunction: TableSqlFunction,
-  rowType: RelDataType)
+  rowType: RelDataType,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => String)
 : String = {
 
-s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: 
${selectToString(rowType)}"
+s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, 
expression)}," +
+  s" select: ${selectToString(rowType)}"
   }
 
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: 
TableSqlFunction): String = {
-val udtfName = sqlFunction.getName
-val operands = 
rexCall.getOperands.asScala.map(_.toString).mkString(",")
+  private[flink] def correlateToString(
+  inputType: RelDataType,
+  rexCall: RexCall,
+  sqlFunction: TableSqlFunction,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => 
String): String = {
+val inFields = inputType.getFieldNames.asScala.toList
+val udtfName = sqlFunction.toString
+val operands = rexCall.getOperands.asScala.map(expression(_, inFields, 
None)).mkString(",")
--- End diff --

please add a space: `mkString(",")` -> `mkString(", ")`


> Use toString method to display operator names for UserDefinedFunction
> -
>
> Key: FLINK-7410
> URL: https://issues.apache.org/jira/browse/FLINK-7410
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> *Motivation*
> Operator names setted in table-api are used by visualization and logging, it 
> is import to make these names simple and readable. Currently, 
> UserDefinedFunction’s name contains class CanonicalName and md5 value making 
> the name too long and unfriendly to users. 
> As shown in the following example, 
> {quote}
> select: (a, b, c, 
> org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a)
>  AS _c3, 
> org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c)
>  AS _c4)
> {quote}
> *Changes:*
>   
> Use {{toString}} method to display operator names for UserDefinedFunction. 
> The method will return class name by default. Users can also override the 
> method to return whatever he wants.
> What do you think [~fhueske] ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7196) add a TTL to transient BLOB files

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192860#comment-16192860
 ] 

ASF GitHub Bot commented on FLINK-7196:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4381
  
rebased onto the latest #4359 


> add a TTL to transient BLOB files
> -
>
> Key: FLINK-7196
> URL: https://issues.apache.org/jira/browse/FLINK-7196
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Transient BLOB files are not automatically cleaned up unless the 
> {{BlobCache}}/{{BlobServer}} are shut down or the files are deleted via the 
> {{delete}} methods. Additionally, they should have a default time-to-live 
> (TTL) so that they may be cleaned up in failure cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933262
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 ---
@@ -41,7 +41,8 @@ class CorrelateTest extends TableTestBase {
 "DataSetCorrelate",
 batchTableNode(0),
 term("invocation", "func1($cor0.c)"),
-term("function", func1.getClass.getCanonicalName),
+term("correlate", s"table(func1($$cor0.c))"),
+term("select", "a,b,c,f0"),
--- End diff --

please change to `term("select", "a", "b", "c", "f0"),` (i.e., use separate 
strings for the field names) for consistency


---


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933578
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -183,17 +183,25 @@ trait CommonCorrelate {
   }
 
   private[flink] def correlateOpName(
+  inputType: RelDataType,
   rexCall: RexCall,
   sqlFunction: TableSqlFunction,
-  rowType: RelDataType)
+  rowType: RelDataType,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => String)
 : String = {
 
-s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: 
${selectToString(rowType)}"
+s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, 
expression)}," +
+  s" select: ${selectToString(rowType)}"
   }
 
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: 
TableSqlFunction): String = {
-val udtfName = sqlFunction.getName
-val operands = 
rexCall.getOperands.asScala.map(_.toString).mkString(",")
+  private[flink] def correlateToString(
+  inputType: RelDataType,
+  rexCall: RexCall,
+  sqlFunction: TableSqlFunction,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => 
String): String = {
+val inFields = inputType.getFieldNames.asScala.toList
+val udtfName = sqlFunction.toString
+val operands = rexCall.getOperands.asScala.map(expression(_, inFields, 
None)).mkString(",")
--- End diff --

please add a space: `mkString(",")` -> `mkString(", ")`


---


[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192867#comment-16192867
 ] 

ASF GitHub Bot commented on FLINK-7410:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933701
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -183,17 +183,25 @@ trait CommonCorrelate {
   }
 
   private[flink] def correlateOpName(
+  inputType: RelDataType,
   rexCall: RexCall,
   sqlFunction: TableSqlFunction,
-  rowType: RelDataType)
+  rowType: RelDataType,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => String)
 : String = {
 
-s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: 
${selectToString(rowType)}"
+s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, 
expression)}," +
+  s" select: ${selectToString(rowType)}"
   }
 
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: 
TableSqlFunction): String = {
-val udtfName = sqlFunction.getName
-val operands = 
rexCall.getOperands.asScala.map(_.toString).mkString(",")
+  private[flink] def correlateToString(
+  inputType: RelDataType,
+  rexCall: RexCall,
+  sqlFunction: TableSqlFunction,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => 
String): String = {
+val inFields = inputType.getFieldNames.asScala.toList
+val udtfName = sqlFunction.toString
+val operands = rexCall.getOperands.asScala.map(expression(_, inFields, 
None)).mkString(",")
--- End diff --

Please add a space to the `mkString` call in `selectToString()` as well. 
Thanks


> Use toString method to display operator names for UserDefinedFunction
> -
>
> Key: FLINK-7410
> URL: https://issues.apache.org/jira/browse/FLINK-7410
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> *Motivation*
> Operator names setted in table-api are used by visualization and logging, it 
> is import to make these names simple and readable. Currently, 
> UserDefinedFunction’s name contains class CanonicalName and md5 value making 
> the name too long and unfriendly to users. 
> As shown in the following example, 
> {quote}
> select: (a, b, c, 
> org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a)
>  AS _c3, 
> org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c)
>  AS _c4)
> {quote}
> *Changes:*
>   
> Use {{toString}} method to display operator names for UserDefinedFunction. 
> The method will return class name by default. Users can also override the 
> method to return whatever he wants.
> What do you think [~fhueske] ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192865#comment-16192865
 ] 

ASF GitHub Bot commented on FLINK-7410:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933355
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 ---
@@ -61,7 +62,8 @@ class CorrelateTest extends TableTestBase {
 "DataSetCorrelate",
 batchTableNode(0),
 term("invocation", "func1($cor0.c, '$')"),
-term("function", func1.getClass.getCanonicalName),
+term("correlate", s"table(func1($$cor0.c,'$$'))"),
--- End diff --

change to `term("correlate", s"table(func1($$cor0.c, '$$'))"),` for 
consistency


> Use toString method to display operator names for UserDefinedFunction
> -
>
> Key: FLINK-7410
> URL: https://issues.apache.org/jira/browse/FLINK-7410
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> *Motivation*
> Operator names setted in table-api are used by visualization and logging, it 
> is import to make these names simple and readable. Currently, 
> UserDefinedFunction’s name contains class CanonicalName and md5 value making 
> the name too long and unfriendly to users. 
> As shown in the following example, 
> {quote}
> select: (a, b, c, 
> org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a)
>  AS _c3, 
> org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c)
>  AS _c4)
> {quote}
> *Changes:*
>   
> Use {{toString}} method to display operator names for UserDefinedFunction. 
> The method will return class name by default. Users can also override the 
> method to return whatever he wants.
> What do you think [~fhueske] ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933701
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -183,17 +183,25 @@ trait CommonCorrelate {
   }
 
   private[flink] def correlateOpName(
+  inputType: RelDataType,
   rexCall: RexCall,
   sqlFunction: TableSqlFunction,
-  rowType: RelDataType)
+  rowType: RelDataType,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => String)
 : String = {
 
-s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: 
${selectToString(rowType)}"
+s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, 
expression)}," +
+  s" select: ${selectToString(rowType)}"
   }
 
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: 
TableSqlFunction): String = {
-val udtfName = sqlFunction.getName
-val operands = 
rexCall.getOperands.asScala.map(_.toString).mkString(",")
+  private[flink] def correlateToString(
+  inputType: RelDataType,
+  rexCall: RexCall,
+  sqlFunction: TableSqlFunction,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => 
String): String = {
+val inFields = inputType.getFieldNames.asScala.toList
+val udtfName = sqlFunction.toString
+val operands = rexCall.getOperands.asScala.map(expression(_, inFields, 
None)).mkString(",")
--- End diff --

Please add a space to the `mkString` call in `selectToString()` as well. 
Thanks


---


[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192864#comment-16192864
 ] 

ASF GitHub Bot commented on FLINK-7410:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933262
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 ---
@@ -41,7 +41,8 @@ class CorrelateTest extends TableTestBase {
 "DataSetCorrelate",
 batchTableNode(0),
 term("invocation", "func1($cor0.c)"),
-term("function", func1.getClass.getCanonicalName),
+term("correlate", s"table(func1($$cor0.c))"),
+term("select", "a,b,c,f0"),
--- End diff --

please change to `term("select", "a", "b", "c", "f0"),` (i.e., use separate 
strings for the field names) for consistency


> Use toString method to display operator names for UserDefinedFunction
> -
>
> Key: FLINK-7410
> URL: https://issues.apache.org/jira/browse/FLINK-7410
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> *Motivation*
> Operator names setted in table-api are used by visualization and logging, it 
> is import to make these names simple and readable. Currently, 
> UserDefinedFunction’s name contains class CanonicalName and md5 value making 
> the name too long and unfriendly to users. 
> As shown in the following example, 
> {quote}
> select: (a, b, c, 
> org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a)
>  AS _c3, 
> org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c)
>  AS _c4)
> {quote}
> *Changes:*
>   
> Use {{toString}} method to display operator names for UserDefinedFunction. 
> The method will return class name by default. Users can also override the 
> method to return whatever he wants.
> What do you think [~fhueske] ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r142933355
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 ---
@@ -61,7 +62,8 @@ class CorrelateTest extends TableTestBase {
 "DataSetCorrelate",
 batchTableNode(0),
 term("invocation", "func1($cor0.c, '$')"),
-term("function", func1.getClass.getCanonicalName),
+term("correlate", s"table(func1($$cor0.c,'$$'))"),
--- End diff --

change to `term("correlate", s"table(func1($$cor0.c, '$$'))"),` for 
consistency


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192852#comment-16192852
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142933167
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

I know the the HttpObjectAggregator will throw an exception, and i _think_ 
it will just be logged server-side. We can't really change that except my 
modifying the aggregator (but i have some WIP to replace it anyway).

I'll add the client-side size check.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142933167
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

I know the the HttpObjectAggregator will throw an exception, and i _think_ 
it will just be logged server-side. We can't really change that except my 
modifying the aggregator (but i have some WIP to replace it anyway).

I'll add the client-side size check.


---


[jira] [Updated] (FLINK-7765) Enable dependency convergence

2017-10-05 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7765:
--
Description: For motivation check 
https://issues.apache.org/jira/browse/FLINK-7765  (was: For motivation check 
[#7739])

> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7765



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7765) Enable dependency convergence

2017-10-05 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7765:
-

 Summary: Enable dependency convergence
 Key: FLINK-7765
 URL: https://issues.apache.org/jira/browse/FLINK-7765
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


For motivation check [#7739]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7765) Enable dependency convergence

2017-10-05 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7765:
--
Description: For motivation check 
https://issues.apache.org/jira/browse/FLINK-7739  (was: For motivation check 
https://issues.apache.org/jira/browse/FLINK-7765)

> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192841#comment-16192841
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142930472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

well they aren't really internal fields, at least i didn't intend them to 
be. The client has to resolve the parameters somehow, so we either have to add 
a custom resolve method to every `MessageParameters` class (which will make for 
an odd API when creating sub-classes), or provide access to each parameter 
(either directly or through a getter). I opted for the direct approach since it 
makes it obvious that we are in fact modifying the `MessageParameters` object.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142930472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

well they aren't really internal fields, at least i didn't intend them to 
be. The client has to resolve the parameters somehow, so we either have to add 
a custom resolve method to every `MessageParameters` class (which will make for 
an odd API when creating sub-classes), or provide access to each parameter 
(either directly or through a getter). I opted for the direct approach since it 
makes it obvious that we are in fact modifying the `MessageParameters` object.


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192837#comment-16192837
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142929069
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
--- End diff --

renamed to testJobSubmitCancelStop. IT verifies that the cliennt sends 
properly sends out requests to the corresponding handlers.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142929069
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
--- End diff --

renamed to testJobSubmitCancelStop. IT verifies that the cliennt sends 
properly sends out requests to the corresponding handlers.


---


[jira] [Commented] (FLINK-7754) Complete termination future after actor has been stopped.

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192828#comment-16192828
 ] 

ASF GitHub Bot commented on FLINK-7754:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4770


> Complete termination future after actor has been stopped.
> -
>
> Key: FLINK-7754
> URL: https://issues.apache.org/jira/browse/FLINK-7754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> At the moment, we complete the termination future when the {{postStop}} 
> method of the {{RpcActor}} has been executed. This, however, does not mean 
> that the underlying actor has been stopped. We should rather complete the 
> future in the {{AkkaRpcService#stopServer}} method where we close the actor 
> with a graceful shutdown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4770: [FLINK-7754] [rpc] Complete termination future aft...

2017-10-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4770


---


[jira] [Comment Edited] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192823#comment-16192823
 ] 

Kostas Kloudas edited comment on FLINK-7549 at 10/5/17 12:53 PM:
-

Hi [~i...@paolorendano.it],

Is this issue still valid, or it is resolved by setting the 
{{timeCharacteristic}} to event time?


was (Author: kkl0u):
Hi [~i...@paolorendano.it],

Is this issue still valid, or it is resolved by setting the 
[[timeCharacteristic]] to event time?

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7754) Complete termination future after actor has been stopped.

2017-10-05 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7754.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 4947ee669fe267e3f71853cd14228b350cf10ac8

> Complete termination future after actor has been stopped.
> -
>
> Key: FLINK-7754
> URL: https://issues.apache.org/jira/browse/FLINK-7754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> At the moment, we complete the termination future when the {{postStop}} 
> method of the {{RpcActor}} has been executed. This, however, does not mean 
> that the underlying actor has been stopped. We should rather complete the 
> future in the {{AkkaRpcService#stopServer}} method where we close the actor 
> with a graceful shutdown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7754) Complete termination future after actor has been stopped.

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192824#comment-16192824
 ] 

ASF GitHub Bot commented on FLINK-7754:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4770
  
Merging this PR since Travis passed.


> Complete termination future after actor has been stopped.
> -
>
> Key: FLINK-7754
> URL: https://issues.apache.org/jira/browse/FLINK-7754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> At the moment, we complete the termination future when the {{postStop}} 
> method of the {{RpcActor}} has been executed. This, however, does not mean 
> that the underlying actor has been stopped. We should rather complete the 
> future in the {{AkkaRpcService#stopServer}} method where we close the actor 
> with a graceful shutdown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4770: [FLINK-7754] [rpc] Complete termination future after acto...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4770
  
Merging this PR since Travis passed.


---


[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-10-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192823#comment-16192823
 ] 

Kostas Kloudas commented on FLINK-7549:
---

Hi [~i...@paolorendano.it],

Is this issue still valid, or it is resolved by setting the 
[[timeCharacteristic]] to event time?

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192819#comment-16192819
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4358
  
Thanks a lot for your work and patience with me @NicoK. Changes look good 
to me. I've rebased it onto the latest master and once Travis gives green 
light, I'll merge it :-)


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192818#comment-16192818
 ] 

Kostas Kloudas commented on FLINK-7606:
---

Hi [~i...@paolorendano.it], 

Sorry for the late reply.

So, if I understand correctly, in a nutshell:
1) for event time, there is no memory leak problem (there is also a pending PR 
that probably fixes the problem also for processing time by unifying the code 
paths for both notions of time), but 
2) at the end of you input, the watermark does not advance and the last batch 
of events is not processed as it is waiting for the watermark +10 sec to trigger the computation, right?

In this case, if you know that your stream is finite, then you can close your 
source (call close() on your source)  and this will send a watermark 
Long.MAX_VALUE that will flush the buffered elements.

Kostas

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4358
  
Thanks a lot for your work and patience with me @NicoK. Changes look good 
to me. I've rebased it onto the latest master and once Travis gives green 
light, I'll merge it :-)


---


[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192805#comment-16192805
 ] 

ASF GitHub Bot commented on FLINK-5005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3703
  
@DieBauer Do you still wan't to work on this? I also started trying to make 
Flink ready for 2.12 before I noticed this older branch. I'd be very happy to 
stop, though, if you're interested in bringing this to an end. It should be 
easier now that we dropped Java 8 support and also agreed to drop Scala 2.10 
support.


> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>Assignee: Aljoscha Krettek
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts

2017-10-05 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3703
  
@DieBauer Do you still wan't to work on this? I also started trying to make 
Flink ready for 2.12 before I noticed this older branch. I'd be very happy to 
stop, though, if you're interested in bringing this to an end. It should be 
easier now that we dropped Java 8 support and also agreed to drop Scala 2.10 
support.


---


[jira] [Created] (FLINK-7764) FlinkKafkaProducer010 does not accept name, uid, or parallelism

2017-10-05 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7764:


 Summary: FlinkKafkaProducer010 does not accept name, uid, or 
parallelism
 Key: FLINK-7764
 URL: https://issues.apache.org/jira/browse/FLINK-7764
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.3.2, 1.4.0
Reporter: Fabian Hueske


As [reported on the user 
list|https://lists.apache.org/thread.html/1e97b79cb5611a942beb609a61b5fba6d62a49c9c86336718a9a0004@%3Cuser.flink.apache.org%3E]:

When I try to use KafkaProducer with timestamps it fails to set name, uid or 
parallelism. It uses default values.

{code}
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration producer = 
FlinkKafkaProducer010
.writeToKafkaWithTimestamps(stream, topicName, schema, props, partitioner);
producer.setFlushOnCheckpoint(flushOnCheckpoint);
producer.name("foo")
.uid("bar")
.setParallelism(5);

return producer;
{code}

As operator name it shows "FlinKafkaProducer 0.10.x” with the typo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7689) Instrument the Flink JDBC sink

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192793#comment-16192793
 ] 

ASF GitHub Bot commented on FLINK-7689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r142918146
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -41,6 +46,11 @@
 public class JDBCOutputFormat extends RichOutputFormat {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+   static final String FLUSH_SCOPE = "flush";
+   static final String FLUSH_RATE_METER_NAME = "rate";
+   static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = 
"rateGreaterThanBatchInterval";
--- End diff --

rename to `batchLimitReachedRate`?


> Instrument the Flink JDBC sink
> --
>
> Key: FLINK-7689
> URL: https://issues.apache.org/jira/browse/FLINK-7689
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
>Reporter: Martin Eden
>Priority: Minor
>  Labels: jdbc, metrics
> Fix For: 1.4.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> As confirmed by the Flink community in the following mailing list 
> [message|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/metrics-for-Flink-sinks-td15200.html]
>   using off the shelf Flink sinks like the JDBC sink, Redis sink or Cassandra 
> sink etc does not expose any sink specific metrics.
> The purpose of this ticket is to add some relevant metrics to the 
> JDBCOutputFormat:
> - Meters for when a flush is made.
> - Histograms for the jdbc batch count and batch execution latency.
> These would allow deeper understanding of the runtime behaviour of 
> performance critical jobs writing to external databases using this generic 
> interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7689) Instrument the Flink JDBC sink

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192792#comment-16192792
 ] 

ASF GitHub Bot commented on FLINK-7689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r142919779
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 ---
@@ -233,6 +255,30 @@ public void testFlush() throws SQLException, 
IOException {
}
}
 
+   @Test
+   public void testMetricsSetup() throws IOException {
--- End diff --

Can you extend this test to check that the metrics are correctly set 
(except for the `durationMs` histogram)?


> Instrument the Flink JDBC sink
> --
>
> Key: FLINK-7689
> URL: https://issues.apache.org/jira/browse/FLINK-7689
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
>Reporter: Martin Eden
>Priority: Minor
>  Labels: jdbc, metrics
> Fix For: 1.4.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> As confirmed by the Flink community in the following mailing list 
> [message|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/metrics-for-Flink-sinks-td15200.html]
>   using off the shelf Flink sinks like the JDBC sink, Redis sink or Cassandra 
> sink etc does not expose any sink specific metrics.
> The purpose of this ticket is to add some relevant metrics to the 
> JDBCOutputFormat:
> - Meters for when a flush is made.
> - Histograms for the jdbc batch count and batch execution latency.
> These would allow deeper understanding of the runtime behaviour of 
> performance critical jobs writing to external databases using this generic 
> interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r142918146
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -41,6 +46,11 @@
 public class JDBCOutputFormat extends RichOutputFormat {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+   static final String FLUSH_SCOPE = "flush";
+   static final String FLUSH_RATE_METER_NAME = "rate";
+   static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = 
"rateGreaterThanBatchInterval";
--- End diff --

rename to `batchLimitReachedRate`?


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r142919779
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 ---
@@ -233,6 +255,30 @@ public void testFlush() throws SQLException, 
IOException {
}
}
 
+   @Test
+   public void testMetricsSetup() throws IOException {
--- End diff --

Can you extend this test to check that the metrics are correctly set 
(except for the `durationMs` histogram)?


---


[GitHub] flink issue #4774: [FLINK-6495] Fix Akka's default value for heartbeat pause

2017-10-05 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4774
  
I have added runtime check for that. To be clear, this was not the reason 
for Kafka tests instabilities and I'm not aware if this was causing any issues. 
But it definitely could and should be fixed anyway (IMO that should be a 
release blocker)


---


[jira] [Commented] (FLINK-6495) Migrate Akka configuration options

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192786#comment-16192786
 ] 

ASF GitHub Bot commented on FLINK-6495:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4774
  
I have added runtime check for that. To be clear, this was not the reason 
for Kafka tests instabilities and I'm not aware if this was causing any issues. 
But it definitely could and should be fixed anyway (IMO that should be a 
release blocker)


> Migrate Akka configuration options
> --
>
> Key: FLINK-6495
> URL: https://issues.apache.org/jira/browse/FLINK-6495
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7729) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7729.
---
Resolution: Duplicate

> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-7729
> URL: https://issues.apache.org/jira/browse/FLINK-7729
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> As per the discussion on the ML:
>  - 
> https://lists.apache.org/thread.html/d7b03b4697f91efc162d5febb772d19d324d3d6daa7e38f6fa811e30@%3Cdev.flink.apache.org%3E
>  - 
> https://lists.apache.org/thread.html/67d7ea9964190b1e1c472a53903c55a3c5cf070bde82a27997226b8c@%3Cdev.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-5005:
---

Assignee: Aljoscha Krettek

> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>Assignee: Aljoscha Krettek
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5005:

Summary: Remove Scala 2.10 support; add Scala 2.12 support  (was: Publish 
Scala 2.12 artifacts)

> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192776#comment-16192776
 ] 

Aljoscha Krettek commented on FLINK-5005:
-

The ML discussions on this concluded on removing 2.10 support and adding 2.12 
support:
 - 
https://lists.apache.org/thread.html/d7b03b4697f91efc162d5febb772d19d324d3d6daa7e38f6fa811e30@%3Cdev.flink.apache.org%3E
 - 
https://lists.apache.org/thread.html/67d7ea9964190b1e1c472a53903c55a3c5cf070bde82a27997226b8c@%3Cdev.flink.apache.org%3E

> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6045) FLINK_CONF_DIR has to be set even though specifying --configDir

2017-10-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192773#comment-16192773
 ] 

Aljoscha Krettek commented on FLINK-6045:
-

We should probably remove use of {{FLINK_CONF_DIR}} anywhere in the actual code 
and only use it in scripts that use it to set {{--configDir}}. This also 
entails removing {{GlobalConfiguration.loadConfiguration()}} (the call without 
arguments).

> FLINK_CONF_DIR has to be set even though specifying --configDir
> ---
>
> Key: FLINK-6045
> URL: https://issues.apache.org/jira/browse/FLINK-6045
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Till Rohrmann
>Priority: Minor
>
> A user reported that {{FLINK_CONF_DIR}} has to be set in addition to 
> specifying --configDir. Otherwise the {{JobManager}} and the {{TaskManagers}} 
> fail silently trying to read from {{fs.hdfs.hadoopconf}}. Specifying one of 
> the two configuration options should be enough to successfully run Flink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4763
  
Alright @zentol. I guess it would work if I signed up for reviewable.


---


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192763#comment-16192763
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4763
  
Alright @zentol. I guess it would work if I signed up for reviewable.


> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

2017-10-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4763
  
@tillrohrmann I wanted to try it out, primarily since i can mark individual 
files as reviewed. For the remaining files I will once again write the comments 
on github.


---


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192755#comment-16192755
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4763
  
@tillrohrmann I wanted to try it out, primarily since i can mark individual 
files as reviewed. For the remaining files I will once again write the comments 
on github.


> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192742#comment-16192742
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142897388
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+   private final RestClient restClient;
+   private final ExecutorService executorService = 
Executors.newFixedThreadPool(4);
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.restClusterClientConfiguration = configuration;
+   this.restClient = new 
RestClient(configuration.getRestEndpointConfiguration(), executorService);
+   }
+
+   @Override
+   public void shutdown() {
+   try {
+   // we only call this for legacy reasons to shutdown 
components that are started in the ClusterClient constructor
+   super.shutdown();
+   } catch (Exception e) {
+   log.error("An error occurred during the client 
shutdown.", e);
+   }
+   this.restClient.shutdown(Time.seconds(5));
+   this.executorService.shutdown();
--- End diff --

Better to use `Executors.gracefulShutdown` here.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>

[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192739#comment-16192739
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142899909
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
--- End diff --

What does test `ABC` test?


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay 

[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192741#comment-16192741
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142907629
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

Hmm what about giving these values to the `JobTerminationMessageParameters` 
constructor? It feels not entirely right that we access internal fields to 
resolve them?


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192740#comment-16192740
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142900393
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new 

[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192743#comment-16192743
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142897184
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -610,25 +633,15 @@ public void cancel(JobID jobId) throws Exception {
 * failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
 */
public void stop(final JobID jobId) throws Exception {
-   final ActorGateway jobManagerGateway = getJobManagerGateway();
+   final ActorGateway jobManager = getJobManagerGateway();
 
-   final Future response;
-   try {
-   response = jobManagerGateway.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
-   } catch (final Exception e) {
-   throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
-   }
+   Future response = jobManager.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
 
-   final Object result = Await.result(response, timeout);
+   final Object rc = Await.result(response, timeout);
 
-   if (result instanceof JobManagerMessages.StoppingSuccess) {
-   log.info("Job stopping with ID " + jobId + " 
succeeded.");
-   } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
-   final Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
-   log.info("Job stopping with ID " + jobId + " failed.", 
t);
-   throw new Exception("Failed to stop the job because of 
\n" + t.getMessage());
-   } else {
-   throw new Exception("Unknown message received while 
stopping: " + result.getClass().getName());
+   if (rc instanceof JobManagerMessages.StoppingFailure) {
+   throw new Exception("Stopping the job with ID " + jobId 
+ " failed.",
+   ((JobManagerMessages.StoppingFailure) 
rc).cause());
--- End diff --

The unknown response type exception was lost


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142899120
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+   private final RestClient restClient;
+   private final ExecutorService executorService = 
Executors.newFixedThreadPool(4);
--- End diff --

Can we give these threads a proper name? Something like 
"RestClusterClientIOThread".


---


  1   2   >