[jira] [Commented] (FLINK-8845) Introduce `parallel recovery` mode for full checkpoint (savepoint)

2018-03-06 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389179#comment-16389179
 ] 

Sihua Zhou commented on FLINK-8845:
---

Event though, {{SstFileWriter}} could not help us to improve performance for 
loading data into RocksDB, but the {{WriteBatch}} can do that, after a 
benchmark, I found with using {{WriteBatch}}, it help us to get (30% ~ 50%) 
outperformance than using {{RocksDB.put()}}. So, I would like to wipe this 
issue and change it to "Using WriteBatch to improve performance for recovery in 
RocksDB backend".

>  Introduce `parallel recovery` mode for full checkpoint (savepoint)
> ---
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{ingestExternalFile()}} and {{SstFileWriter}} provided by RocksDB, 
> we can restore from fully checkpoint (savepoint) in parallel. This can also 
> be extended to incremental checkpoint easily, but for the sake of simple, we 
> do this in two separate tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389174#comment-16389174
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * request parameter for job accumulator's handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsMessageParameters extends JobMessageParameters 
{
+
+   public final AccumulatorsIncludeSerializedValueQueryParameter 
queryParameter = new AccumulatorsIncludeSerializedValueQueryParameter();
--- End diff --

field name is a bit generic; how about `includeSerializedAccumulators`?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389172#comment-16389172
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756524
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   return 
accumulatorsInfo.getSerializedUserAccumulators();
--- End diff --

the accumulators should be deserialized via 
`SerializedValue#deserialize(ClassLoader)` .

If `Map getAccumulators(JobID jobID, ClassLoader loader)` 
(that also should be overridden) was called use the passed in `ClassLoader`, 
otherwise `ClassLoader.getSystemClassLoader()`.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389173#comment-16389173
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172755830
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
--- End diff --

we should also override `Map getAccumulators(JobID jobID, 
ClassLoader loader)`


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756524
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   return 
accumulatorsInfo.getSerializedUserAccumulators();
--- End diff --

the accumulators should be deserialized via 
`SerializedValue#deserialize(ClassLoader)` .

If `Map getAccumulators(JobID jobID, ClassLoader loader)` 
(that also should be overridden) was called use the passed in `ClassLoader`, 
otherwise `ClassLoader.getSystemClassLoader()`.


---


[jira] [Commented] (FLINK-8860) SlotManager spamming log files

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389168#comment-16389168
 ] 

ASF GitHub Bot commented on FLINK-8860:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5637
  
I would go for `TRACE` given how often we set the log level to `DEBUG`.


> SlotManager spamming log files
> --
>
> Key: FLINK-8860
> URL: https://issues.apache.org/jira/browse/FLINK-8860
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> {{SlotManager}} is spamming the log files a lot with
> {code}
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance b16c4e516995d1e672c0933bb380770c.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance de58fbf1c069620a4275c8b529deb20b.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 86ab5a7e1d57bb2883fc0d1f2aebb304.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 90f9ab2bd433db41b3ab567fd246fb3c.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance ec99fcc5a801272402af9afe08a1001d.
> 2018-03-05 10:45:12,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 4c1c4b5ce52195dc90196c10c26d9ef8.
> 2018-03-05 10:45:12,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 2541d0f1398fc307aaf86bf7750535f1.
> 2018-03-05 10:45:12,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 91d94a9fdfce3cbcef32ac1c6e7b3fbf.
> 2018-03-05 10:45:22,392 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 91d94a9fdfce3cbcef32ac1c6e7b3fbf.
> 2018-03-05 10:45:22,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 90f9ab2bd433db41b3ab567fd246fb3c.
> {code}
> This message is printed once per {{TaskManager}} heartbeat.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172755830
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
--- End diff --

we should also override `Map getAccumulators(JobID jobID, 
ClassLoader loader)`


---


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * request parameter for job accumulator's handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsMessageParameters extends JobMessageParameters 
{
+
+   public final AccumulatorsIncludeSerializedValueQueryParameter 
queryParameter = new AccumulatorsIncludeSerializedValueQueryParameter();
--- End diff --

field name is a bit generic; how about `includeSerializedAccumulators`?


---


[GitHub] flink issue #5637: [FLINK-8860][flip6] stop SlotManager spamming logs for ev...

2018-03-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5637
  
I would go for `TRACE` given how often we set the log level to `DEBUG`.


---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389161#comment-16389161
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
hi @zentol , I refactor the code based on your review suggestion. Would you 
please review again, thanks!


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-06 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
hi @zentol , I refactor the code based on your review suggestion. Would you 
please review again, thanks!


---


[jira] [Closed] (FLINK-8874) rewrite Flink docs/dev/stream/operators/process_function.md to recommend using KeyedProcessFunction

2018-03-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-8874.
---
Resolution: Invalid

> rewrite Flink docs/dev/stream/operators/process_function.md to recommend 
> using KeyedProcessFunction
> ---
>
> Key: FLINK-8874
> URL: https://issues.apache.org/jira/browse/FLINK-8874
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> We need to completely rewrite Flink 
> docs/dev/stream/operators/process_function.md to recommend using 
> KeyedProcessFunction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389128#comment-16389128
 ] 

ASF GitHub Bot commented on FLINK-8873:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5649

[FLINK-8873] [DataStream API] [Tests] move unit tests of KeyedStream from 
DataStreamTest to KeyedStreamTest

## What is the purpose of the change

move unit tests of `KeyedStream` from `DataStreamTest` to 
`KeyedStreamTest`, in order to have a clearer separation

## Brief change log

added `KeyedStreamTest.java` and `KeyedStreamTest.scala`, and moved related 
unit tests to them

## Verifying this change

This change is already covered by existing tests, such as *KeyedStreamTest*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8873

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5649


commit d4e372fbb21edc0507df461aa7f47a9168350a1a
Author: Bowen Li 
Date:   2018-03-05T19:52:37Z

[FLINK-8873] move unit tests of KeyedStream from DataStreamTest to 
KeyedStreamTest




> move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
> -
>
> Key: FLINK-8873
> URL: https://issues.apache.org/jira/browse/FLINK-8873
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> move unit tests of KeyedStream.scala from DataStreamTest.scala to 
> KeyedStreamTest.scala, in order to have clearer separation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5649: [FLINK-8873] [DataStream API] [Tests] move unit te...

2018-03-06 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5649

[FLINK-8873] [DataStream API] [Tests] move unit tests of KeyedStream from 
DataStreamTest to KeyedStreamTest

## What is the purpose of the change

move unit tests of `KeyedStream` from `DataStreamTest` to 
`KeyedStreamTest`, in order to have a clearer separation

## Brief change log

added `KeyedStreamTest.java` and `KeyedStreamTest.scala`, and moved related 
unit tests to them

## Verifying this change

This change is already covered by existing tests, such as *KeyedStreamTest*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8873

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5649


commit d4e372fbb21edc0507df461aa7f47a9168350a1a
Author: Bowen Li 
Date:   2018-03-05T19:52:37Z

[FLINK-8873] move unit tests of KeyedStream from DataStreamTest to 
KeyedStreamTest




---


[jira] [Commented] (FLINK-8845) Introduce `parallel recovery` mode for full checkpoint (savepoint)

2018-03-06 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389123#comment-16389123
 ] 

Sihua Zhou commented on FLINK-8845:
---

Unfortunately, even though according to RocksDB 
[wiki|https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ], the best way to 
load data into RocksDB is "Generate SST files (using {{SstFileWriter}}) with 
non-overlapping ranges in parallel and bulk load the SST files.". But after 
implementing this and test with a simple bench mark, I found that the 
performance is not that good as expected, it's almost the same or worst that as 
using {{Rocks.put()}}. After a bit analysis I found that when building SST it 
consumed a lot of time to create {{DirectSlice}} and currently we can't reuse 
the {{DirectSlice}} in java api. Even though in C++ this could help to get a 
outperformance result, but in java I think we can't use this to improve the 
performance currently (maybe somedays RocksDB might improve this to enable us 
get a approximate performance in java as using C++) ...

>  Introduce `parallel recovery` mode for full checkpoint (savepoint)
> ---
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{ingestExternalFile()}} and {{SstFileWriter}} provided by RocksDB, 
> we can restore from fully checkpoint (savepoint) in parallel. This can also 
> be extended to incremental checkpoint easily, but for the sake of simple, we 
> do this in two separate tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest

2018-03-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8873:

Summary: move unit tests of KeyedStream from DataStreamTest to 
KeyedStreamTest  (was: move unit tests of KeyedStream.scala from 
DataStreamTest.scala to KeyedStreamTest.scala)

> move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
> -
>
> Key: FLINK-8873
> URL: https://issues.apache.org/jira/browse/FLINK-8873
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> move unit tests of KeyedStream.scala from DataStreamTest.scala to 
> KeyedStreamTest.scala, in order to have clearer separation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8873) move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest

2018-03-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8873:

Fix Version/s: 1.5.0

> move unit tests of KeyedStream from DataStreamTest to KeyedStreamTest
> -
>
> Key: FLINK-8873
> URL: https://issues.apache.org/jira/browse/FLINK-8873
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0, 1.6.0
>
>
> move unit tests of KeyedStream.scala from DataStreamTest.scala to 
> KeyedStreamTest.scala, in order to have clearer separation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389051#comment-16389051
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
A few points to mention:

1. @StephanEwen there is already common idleness detection implemented 
within `SourceContext`s, see `StreamSourceContexts`. The idleness detection, 
however, is currently always disabled and we do not allow users to configure it.

2. I agree that we shouldn't add anything more to the connector, have also 
discussed this offline with @aljoscha. We should maybe do this only as part of 
the new connector rework that @tweise and I were talking about, in 1.6.


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
A few points to mention:

1. @StephanEwen there is already common idleness detection implemented 
within `SourceContext`s, see `StreamSourceContexts`. The idleness detection, 
however, is currently always disabled and we do not allow users to configure it.

2. I agree that we shouldn't add anything more to the connector, have also 
discussed this offline with @aljoscha. We should maybe do this only as part of 
the new connector rework that @tweise and I were talking about, in 1.6.


---


[jira] [Commented] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388967#comment-16388967
 ] 

ASF GitHub Bot commented on FLINK-8887:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5648

[FLINK-8887][flip-6] ClusterClient.getJobStatus can throw 
FencingTokenException

## What is the purpose of the change

*This pull request fixed ClusterClient.getJobStatus throw 
FencingTokenException issue*


## Brief change log

  - *try-catch request job status from job master gateway if catch a 
exception then goto : request job status  from archived execution graph store*

## Verifying this change

This change is already covered by existing tests, such as 
*DispatcherTest.testCacheJobExecutionResult*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8887

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5648


commit 03c5bf0aa1536842359740b53f43e7b46f48b006
Author: vinoyang 
Date:   2018-03-07T03:43:10Z

[FLINK-8887][flip-6] ClusterClient.getJobStatus can throw 
FencingTokenException




> ClusterClient.getJobStatus can throw FencingTokenException
> --
>
> Key: FLINK-8887
> URL: https://issues.apache.org/jira/browse/FLINK-8887
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or 
> {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. 
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by 
> job id. If a reference is found, {{requestJobStatus}} is called on the 
> respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. 
> However, between the lookup and the method call, the {{JobMaster}} of the 
> respective job may have lost leadership already (job finished), and has set 
> the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token mismatch: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null 
> did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token 

[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-06 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5648

[FLINK-8887][flip-6] ClusterClient.getJobStatus can throw 
FencingTokenException

## What is the purpose of the change

*This pull request fixed ClusterClient.getJobStatus throw 
FencingTokenException issue*


## Brief change log

  - *try-catch request job status from job master gateway if catch a 
exception then goto : request job status  from archived execution graph store*

## Verifying this change

This change is already covered by existing tests, such as 
*DispatcherTest.testCacheJobExecutionResult*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8887

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5648


commit 03c5bf0aa1536842359740b53f43e7b46f48b006
Author: vinoyang 
Date:   2018-03-07T03:43:10Z

[FLINK-8887][flip-6] ClusterClient.getJobStatus can throw 
FencingTokenException




---


[jira] [Assigned] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException

2018-03-06 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8887:
---

Assignee: vinoyang

> ClusterClient.getJobStatus can throw FencingTokenException
> --
>
> Key: FLINK-8887
> URL: https://issues.apache.org/jira/browse/FLINK-8887
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or 
> {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. 
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by 
> job id. If a reference is found, {{requestJobStatus}} is called on the 
> respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. 
> However, between the lookup and the method call, the {{JobMaster}} of the 
> respective job may have lost leadership already (job finished), and has set 
> the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token mismatch: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null 
> did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token not set: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8687) MaterializedCollectStreamResult#retrievePage should take resultLock

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388919#comment-16388919
 ] 

ASF GitHub Bot commented on FLINK-8687:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5647

[FLINK-8687] Make MaterializedCollectStreamResult#retrievePage to hav…

…e resultLock

## What is the purpose of the change

Currently ```MaterializedCollectStreamResult#retrievePage``` checks page 
range and calls snapshot.subList() without holding resultLock. resultLock 
should be taken.

## Brief change log
Add synchronized to protect it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8687

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5647


commit e3308ba0914404d424dfbc17c70d1b948c97b175
Author: mingleiZhang 
Date:   2018-03-07T02:36:52Z

[FLINK-8687] Make MaterializedCollectStreamResult#retrievePage to have 
resultLock




> MaterializedCollectStreamResult#retrievePage should take resultLock
> ---
>
> Key: FLINK-8687
> URL: https://issues.apache.org/jira/browse/FLINK-8687
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Currently MaterializedCollectStreamResult#retrievePage checks page range and 
> calls snapshot.subList() without holding resultLock.
> {{resultLock}} should be taken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5647: [FLINK-8687] Make MaterializedCollectStreamResult#...

2018-03-06 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5647

[FLINK-8687] Make MaterializedCollectStreamResult#retrievePage to hav…

…e resultLock

## What is the purpose of the change

Currently ```MaterializedCollectStreamResult#retrievePage``` checks page 
range and calls snapshot.subList() without holding resultLock. resultLock 
should be taken.

## Brief change log
Add synchronized to protect it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8687

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5647


commit e3308ba0914404d424dfbc17c70d1b948c97b175
Author: mingleiZhang 
Date:   2018-03-07T02:36:52Z

[FLINK-8687] Make MaterializedCollectStreamResult#retrievePage to have 
resultLock




---


[jira] [Commented] (FLINK-8623) ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388909#comment-16388909
 ] 

ASF GitHub Bot commented on FLINK-8623:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5449
  
Thanks @NicoK . Use this patch pushed here at least two times and Travis CI 
verifies without error for this.


> ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on 
> Travis
> 
>
> Key: FLINK-8623
> URL: https://issues.apache.org/jira/browse/FLINK-8623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> {{ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics}} fails on 
> Travis: https://travis-ci.org/apache/flink/jobs/33932



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5449: [FLINK-8623] ConnectionUtilsTest.testReturnLocalHostAddre...

2018-03-06 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5449
  
Thanks @NicoK . Use this patch pushed here at least two times and Travis CI 
verifies without error for this.


---


[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2018-03-06 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7775:
--
Description: 
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}
The method of PermanentBlobCache is not used.
We should remove it.

  was:
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}

The method of PermanentBlobCache is not used.
We should remove it.


> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method of PermanentBlobCache is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8687) MaterializedCollectStreamResult#retrievePage should take resultLock

2018-03-06 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388891#comment-16388891
 ] 

mingleizhang commented on FLINK-8687:
-

[~yuzhih...@gmail.com] I think I was wrong.

> MaterializedCollectStreamResult#retrievePage should take resultLock
> ---
>
> Key: FLINK-8687
> URL: https://issues.apache.org/jira/browse/FLINK-8687
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Currently MaterializedCollectStreamResult#retrievePage checks page range and 
> calls snapshot.subList() without holding resultLock.
> {{resultLock}} should be taken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8687) MaterializedCollectStreamResult#retrievePage should take resultLock

2018-03-06 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388890#comment-16388890
 ] 

mingleizhang commented on FLINK-8687:
-

Thanks [~yuzhih...@gmail.com] , actually, I do not know the multi-thread 
scenario either, I just use GUESS, or MIGHT be, as there is a {{resultLock}}. 
So, I think there will be many {{write}} multi-thread, in this issue, it is 
{{updatePage}}. I even did not know that the class or method in gateway package 
will encounter multi-thread accessing. Hmm

> MaterializedCollectStreamResult#retrievePage should take resultLock
> ---
>
> Key: FLINK-8687
> URL: https://issues.apache.org/jira/browse/FLINK-8687
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Currently MaterializedCollectStreamResult#retrievePage checks page range and 
> calls snapshot.subList() without holding resultLock.
> {{resultLock}} should be taken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8883) ExceptionUtils.rethrowIfFatalError should treat ThreadDeath as fatal

2018-03-06 Thread Ken Krugler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ken Krugler updated FLINK-8883:
---
Summary: ExceptionUtils.rethrowIfFatalError should treat ThreadDeath as 
fatal  (was: ExceptionUtils.rethrowIfFatalError should tread ThreadDeath as 
fatal.)

> ExceptionUtils.rethrowIfFatalError should treat ThreadDeath as fatal
> 
>
> Key: FLINK-8883
> URL: https://issues.apache.org/jira/browse/FLINK-8883
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Thread deaths leave code in inconsistent state and should thus always be 
> forwarded as fatal exceptions that cannot be handled in any way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis

2018-03-06 Thread Kailash Hassan Dayanand (JIRA)
Kailash Hassan Dayanand created FLINK-:
--

 Summary: Upgrade AWS SDK in flink-connector-kinesis
 Key: FLINK-
 URL: https://issues.apache.org/jira/browse/FLINK-
 Project: Flink
  Issue Type: Improvement
Reporter: Kailash Hassan Dayanand


Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of this 
version upgrade for KCL and KPL versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388497#comment-16388497
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
We may want to delay merging this until 
[FLINK-8881](https://issues.apache.org/jira/browse/FLINK-8881) has been 
addressed, as it voids the primary use-case of this handler.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
We may want to delay merging this until 
[FLINK-8881](https://issues.apache.org/jira/browse/FLINK-8881) has been 
addressed, as it voids the primary use-case of this handler.


---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388496#comment-16388496
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172651895
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,33 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList("true"));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null) {
+   Map result = new HashMap<>(3);
+
+   
result.put(JobAccumulatorsInfo.FIELD_NAME_JOB_ACCUMULATORS, 
accumulatorsInfo.getJobAccumulators());
--- End diff --

The resulting API (as shown in `RestClusterClientTest`) is effectively not 
usable for users and inconsistent with existing behavior in `ClusterClient`.

The returned map should only contain the deserialized accumulators with 
their respective name as the key.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172651895
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,33 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList("true"));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null) {
+   Map result = new HashMap<>(3);
+
+   
result.put(JobAccumulatorsInfo.FIELD_NAME_JOB_ACCUMULATORS, 
accumulatorsInfo.getJobAccumulators());
--- End diff --

The resulting API (as shown in `RestClusterClientTest`) is effectively not 
usable for users and inconsistent with existing behavior in `ClusterClient`.

The returned map should only contain the deserialized accumulators with 
their respective name as the key.


---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388491#comment-16388491
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172650031
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -33,19 +39,38 @@
 public class JobAccumulatorsInfo implements ResponseBody {
public static final String FIELD_NAME_JOB_ACCUMULATORS = 
"job-accumulators";
public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = 
"user-task-accumulators";
+   public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS 
= "serialized-user-task-accumulators";
 
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
private List jobAccumulators;
 
@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
private List userAccumulators;
 
+   @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
+   @JsonSerialize(contentUsing = SerializedValueSerializer.class)
+   private Map serializedUserAccumulators;
+
@JsonCreator
public JobAccumulatorsInfo(
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List jobAccumulators,
-   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators) {
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
+   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) {
this.jobAccumulators = 
Preconditions.checkNotNull(jobAccumulators);
this.userAccumulators = 
Preconditions.checkNotNull(userAccumulators);
+   this.serializedUserAccumulators = 
Preconditions.checkNotNull(serializedUserAccumulators);
+   }
+
+   public List getJobAccumulators() {
--- End diff --

missing `@JsonIgnore` annotations


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172650031
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -33,19 +39,38 @@
 public class JobAccumulatorsInfo implements ResponseBody {
public static final String FIELD_NAME_JOB_ACCUMULATORS = 
"job-accumulators";
public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = 
"user-task-accumulators";
+   public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS 
= "serialized-user-task-accumulators";
 
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
private List jobAccumulators;
 
@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
private List userAccumulators;
 
+   @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
+   @JsonSerialize(contentUsing = SerializedValueSerializer.class)
+   private Map serializedUserAccumulators;
+
@JsonCreator
public JobAccumulatorsInfo(
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List jobAccumulators,
-   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators) {
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
+   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) {
this.jobAccumulators = 
Preconditions.checkNotNull(jobAccumulators);
this.userAccumulators = 
Preconditions.checkNotNull(userAccumulators);
+   this.serializedUserAccumulators = 
Preconditions.checkNotNull(serializedUserAccumulators);
+   }
+
+   public List getJobAccumulators() {
--- End diff --

missing `@JsonIgnore` annotations


---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388476#comment-16388476
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsQueryParameter extends 
MessageQueryParameter {
--- End diff --

The name is also a bit generic.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388474#comment-16388474
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsQueryParameter extends 
MessageQueryParameter {
--- End diff --

Why not directly make this a boolean option?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388478#comment-16388478
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648802
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -104,6 +129,18 @@ public UserTaskAccumulator(
this.value = Preconditions.checkNotNull(value);
}
 
+   public String getName() {
--- End diff --

missing `@JsonIgnore` annotation (also applies to other getters


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsQueryParameter extends 
MessageQueryParameter {
--- End diff --

The name is also a bit generic.


---


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648802
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -104,6 +129,18 @@ public UserTaskAccumulator(
this.value = Preconditions.checkNotNull(value);
}
 
+   public String getName() {
--- End diff --

missing `@JsonIgnore` annotation (also applies to other getters


---


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-03-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsQueryParameter extends 
MessageQueryParameter {
--- End diff --

Why not directly make this a boolean option?


---


[jira] [Updated] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-06 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8459:

Release Note: The REST API to trigger the "cancel with savepoint" action 
has changed, and is not backwards compatible.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-06 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-8459.
---
Resolution: Fixed

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-06 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reopened FLINK-8459:
-

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException

2018-03-06 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8887:
---

 Summary: ClusterClient.getJobStatus can throw FencingTokenException
 Key: FLINK-8887
 URL: https://issues.apache.org/jira/browse/FLINK-8887
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Gary Yao
 Fix For: 1.5.0


*Description*
Calling {{RestClusterClient.getJobStatus}} or 
{{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. 

*Analysis*
{{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by job 
id. If a reference is found, {{requestJobStatus}} is called on the respective 
instance. If not, the {{ArchivedExecutionGraphStore}} is queried. However, 
between the lookup and the method call, the {{JobMaster}} of the respective job 
may have lost leadership already (job finished), and has set the fencing token 
to {{null}}.

*Stacktrace*

{noformat}
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token mismatch: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null did 
not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at 
akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}

{noformat}
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at 
akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388293#comment-16388293
 ] 

ASF GitHub Bot commented on FLINK-6053:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5501
  
LGTM, +1 on merging to 1.6.0


> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5501: [FLINK-6053][metrics] Add new Number-/StringGauge metric ...

2018-03-06 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5501
  
LGTM, +1 on merging to 1.6.0


---


[jira] [Updated] (FLINK-6567) ExecutionGraphMetricsTest fails on Windows CI

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6567:

Fix Version/s: (was: 1.5.0)
   1.6.0

> ExecutionGraphMetricsTest fails on Windows CI
> -
>
> Key: FLINK-6567
> URL: https://issues.apache.org/jira/browse/FLINK-6567
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The {{testExecutionGraphRestartTimeMetric}} fails every time i run it on 
> AppVeyor. It also very rarely failed for me locally.
> The test fails at Line 235 if the RUNNING timestamp is equal to the 
> RESTARTING timestamp, which may happen when combining a fast test with a low 
> resolution clock.
> A simple fix would be to increase the timestamp between RUNNING and 
> RESTARTING by adding a 50ms sleep timeout into the 
> {{TestingRestartStrategy#canRestart()}} method, as this one is called before 
> transitioning to the RESTARTING state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8758) Expose method for non-blocking job submission on ClusterClient

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8758.
---
Resolution: Fixed

master: 1e48b722d23dc160e065b04c93d41b8df1400c19
1.5: 7c3be91491f9a0d2ed9665dc0908ed59ab1587ef

> Expose method for non-blocking job submission on ClusterClient
> --
>
> Key: FLINK-8758
> URL: https://issues.apache.org/jira/browse/FLINK-8758
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Some tests that we need to port as part of FLINK-8700 need a way of 
> submitting jobs to the testing cluster in a non-blocking fashion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8759) Bump Netty to 4.0.56

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8759:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.6.0
>
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7286) Flink Dashboard fails to display bytes/records received by sources

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7286:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Flink Dashboard fails to display bytes/records received by sources
> --
>
> Key: FLINK-7286
> URL: https://issues.apache.org/jira/browse/FLINK-7286
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Webfrontend
>Affects Versions: 1.3.1
>Reporter: Elias Levy
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.6.0
>
>
> It appears Flink can't measure the number of bytes read or records produced 
> by a source (e.g. Kafka source). This is particularly problematic for simple 
> jobs where the job pipeline is chained, and in which there are no 
> measurements between operators. Thus, in the UI it appears that the job is 
> not consuming any data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5513) Remove relocation of internal API classes

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5513:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Remove relocation of internal API classes
> -
>
> Key: FLINK-5513
> URL: https://issues.apache.org/jira/browse/FLINK-5513
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, we are relocating the {{curator}} dependency in order to avoid 
> conflicts with user code classes. This happens for example in the 
> {{flink-runtime}} module. The problem with that is that {{curator}} classes, 
> such as the {{CuratorFramework}}, are part of Flink's internal API. So for 
> example, the {{ZooKeeperStateHandleStore}} requires a {{CuratorFramework}} as 
> argument in order to instantiate it. By relocating {{curator}} it is no 
> longer possible to use this class outside of {{flink-runtime}} without some 
> nasty tricks (see {{flink-mesos}} for that).
> I think it is not good practice to relocate internal API classes because it 
> hinders easy code reuse. I propose to either introduce our own interfaces 
> which abstract the {{CuratorFramework}} away or (imo the better solution) to 
> get rid of the {{Curator}} relocation. The latter might entail that we 
> properly separate the API modules from the runtime modules so that users 
> don't have to pull in the runtime dependencies if they want to develop a 
> Flink job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6053:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6900) Limit size of indiivual components in DropwizardReporter

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6900:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Limit size of indiivual components in DropwizardReporter
> 
>
> Key: FLINK-6900
> URL: https://issues.apache.org/jira/browse/FLINK-6900
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5005:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8047) Create HTTP Sink

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8047:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Create HTTP Sink
> 
>
> Key: FLINK-8047
> URL: https://issues.apache.org/jira/browse/FLINK-8047
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Jessica Negara
>Assignee: Michael Gendelman
>Priority: Major
> Fix For: 1.6.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Implement an HTTP output connector + sink, to allow users to send Flink 
> output to an HTTP web server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8031) Provide configuration to enable/disable ability to cancel jobs in Web Frontend

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8031:

Fix Version/s: (was: 1.5.0)

> Provide configuration to enable/disable ability to cancel jobs in Web Frontend
> --
>
> Key: FLINK-8031
> URL: https://issues.apache.org/jira/browse/FLINK-8031
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Affects Versions: 1.3.2
>Reporter: Stephen Hesketh
>Priority: Major
>
> The Web API can be configured to prevent job submission by setting 
> jobmanager.web.submit.enable to false in the Flink YAML configuration file:
> {quote}
> \# Flag to specify whether job submission is enabled from the web-based
> \# runtime monitor. Uncomment to disable.
> {color:red}
> jobmanager.web.submit.enable: false{color}
> {quote}
> Users can still cancel running jobs. We would like to be able to offer the 
> Web Frontend to teams for monitoring of jobs, but since this is shared 
> infrastructure we do not want users to be able to run or cancel jobs through 
> this interface.
> It is proposed that the ability to cancel jobs is also configurable in the 
> Flink yaml configuration file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8033) Build Flink with JDK 9

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8033:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Build Flink with JDK 9
> --
>
> Key: FLINK-8033
> URL: https://issues.apache.org/jira/browse/FLINK-8033
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.6.0
>
>
> This is a JIRA to track all issues that found to make Flink compatible with 
> Java 9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8885) The DispatcherThreadFactory should register uncaught exception handlers

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8885.
---
Resolution: Duplicate

> The DispatcherThreadFactory should register uncaught exception handlers
> ---
>
> Key: FLINK-8885
> URL: https://issues.apache.org/jira/browse/FLINK-8885
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> The {{DispatcherThreadFactory}} is responsible for spawning the thread pool 
> threads for TaskManager's async dispatcher and for the CheckpointCoordinators 
> timed trigger.
> In case of uncaught exceptions in these threads, the system is not healthy 
> and more, hence they should register the 
> {{FatalExitUcaughtExceptionsHandler}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8655) Add a default keyspace to CassandraSink

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8655:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7957) Add MetricGroup#getLogicalScope

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7957:

Fix Version/s: (was: 1.5.0)

> Add MetricGroup#getLogicalScope
> ---
>
> Key: FLINK-7957
> URL: https://issues.apache.org/jira/browse/FLINK-7957
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
>
> Various reporters make use of a generalized scope string (e.g. 
> "taskmanager.job.task."). This string can be generated with 
> {{AbstractMetricGroup#getLogicalScope}}, which is however an internal API. As 
> a result, the access pattern currently looks like this:
> {code}
> ((FrontMetricGroup)group).getLogicalScope(CHARACTER_FILTER,
>  '.')
> {code}
> Given the wide-spread of this kind of scope i propose to move it into the 
> MetricGroup interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8793) Hide key containing "secret" in web interface

2018-03-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8793:

Priority: Blocker  (was: Minor)

> Hide key containing "secret" in web interface
> -
>
> Key: FLINK-8793
> URL: https://issues.apache.org/jira/browse/FLINK-8793
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.1
>Reporter: Etienne CARRIERE
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, we going in /jobmanager/config on the web interface, the value of 
> the key containing "password" are replaced by "" 
> When using s3 for checkpoint/savepoint configuration on an infrastructure 
> which is not on AWS (where IAM is not possible), the s3.secret-key is 
> revealed from the interface. 
> I propose the same behaviour as key with "password" for key with "secret" 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388266#comment-16388266
 ] 

ASF GitHub Bot commented on FLINK-8876:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5645
  
The end-to-end test would be cheap as we wouldn't execute a flink job or 
even start a flink cluster. 
The scripts in `flink-end-to-end-tests` can do pretty much anything they 
want; my idea was to just call the test method in a non-test context, in which 
case the test should throw an AssertionError. It may not be the cleanest thing 
to do though as we have to max test and production code in a single jar.

My goal was to verify that the sections guarded by 
`CONCURRENT_ACCESS_CHECK` are skipped at runtime by default. As it stands 
someone could just hard-code it to `true` and there's no test preventing that.


> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5645: FLINK-8876 Improve concurrent access handling in stateful...

2018-03-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5645
  
The end-to-end test would be cheap as we wouldn't execute a flink job or 
even start a flink cluster. 
The scripts in `flink-end-to-end-tests` can do pretty much anything they 
want; my idea was to just call the test method in a non-test context, in which 
case the test should throw an AssertionError. It may not be the cleanest thing 
to do though as we have to max test and production code in a single jar.

My goal was to verify that the sections guarded by 
`CONCURRENT_ACCESS_CHECK` are skipped at runtime by default. As it stands 
someone could just hard-code it to `true` and there's no test preventing that.


---


[GitHub] flink pull request #5646: [hotfix] [javadocs] minor javadoc fix in Timestamp...

2018-03-06 Thread pjfanning
GitHub user pjfanning opened a pull request:

https://github.com/apache/flink/pull/5646

[hotfix] [javadocs] minor javadoc fix in TimestampAssigner




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pjfanning/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5646


commit 80ad9367593c3a17bec12ef2a1baa927b867a71d
Author: PJ Fanning 
Date:   2018-03-06T18:04:05Z

minor javadoc fix in TimestampAssigner.java




---


[jira] [Created] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-03-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8886:
-

 Summary: Job isolation via scheduling in shared cluster
 Key: FLINK-8886
 URL: https://issues.apache.org/jira/browse/FLINK-8886
 Project: Flink
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 1.5.0
Reporter: Elias Levy


Flink's TaskManager executes tasks from different jobs within the same JMV as 
threads.  We prefer to isolate different jobs on their on JVM.  Thus, we must 
use different TMs for different jobs.  As currently the scheduler will allocate 
task slots within a TM to tasks from different jobs, that means we must stand 
up one cluster per job.  This is wasteful, as it requires at least two 
JobManagers per cluster for high-availability, and the JMs have low utilization.

Additionally, different jobs may require different resources.  Some jobs are 
compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
the scheduler threats all TMs are equivalent, except possibly in their number 
of available task slots.  Thus, one is required to stand up multiple cluster if 
there is a need for different types of TMs.

 

It would be useful if one could specify requirements on job, such that they are 
only scheduled on a subset of TMs.  Properly configured, that would permit 
isolation of jobs in a shared cluster and scheduling of jobs with specific 
resource needs.

 

One possible implementation is to specify a set of tags on the TM config file 
which the TMs used when registering with the JM, and another set of tags 
configured within the job or supplied when submitting the job.  The scheduler 
could then match the tags in the job with the tags in the TMs.  In a 
restrictive mode the scheduler would assign a job task to a TM only if all tags 
match.  In a relaxed mode the scheduler could assign a job task to a TM if 
there is a partial match, while giving preference to a more accurate match.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8708) Unintended integer division in StandaloneThreadedGenerator

2018-03-06 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8708:
--
Description: 
In 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
 :
{code}
double factor = (ts - lastTimeStamp) / 1000;
{code}
Proper casting should be done before the integer division

  was:
In 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
 :

{code}
double factor = (ts - lastTimeStamp) / 1000;
{code}
Proper casting should be done before the integer division


> Unintended integer division in StandaloneThreadedGenerator
> --
>
> Key: FLINK-8708
> URL: https://issues.apache.org/jira/browse/FLINK-8708
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In 
> flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
>  :
> {code}
> double factor = (ts - lastTimeStamp) / 1000;
> {code}
> Proper casting should be done before the integer division



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388123#comment-16388123
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
There was a related discussion on the mailing list; this and several other 
features could be provided by a common connector framework. Such initiative is 
a much larger effort though and it is not clear to me that users can wait? The 
Kinesis consumer has virtually identical requirements and we have already 
written custom code for it.


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-06 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
There was a related discussion on the mailing list; this and several other 
features could be provided by a common connector framework. Such initiative is 
a much larger effort though and it is not clear to me that users can wait? The 
Kinesis consumer has virtually identical requirements and we have already 
written custom code for it.


---


[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2018-03-06 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281
 ] 

Ted Yu edited comment on FLINK-6105 at 3/6/18 5:03 PM:
---

In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}
Interrupt status should be restored, or throw InterruptedIOException .


was (Author: yuzhih...@gmail.com):
In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}
Interrupt status should be restored, or throw InterruptedIOException.

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-03-06 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8667.
-
Resolution: Fixed

Merged on master with  836998bd65ef2d0d0276faed189a0dfe8a7a6dc3

and on release-1.5 with 80020cb5866c8bac67a48f89aa481de7de262f83

> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()

2018-03-06 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388066#comment-16388066
 ] 

Nico Kruber commented on FLINK-4387:


because it is not fixed :( - and I'd target this for 1.6 only

> Instability in KvStateClientTest.testClientServerIntegration()
> --
>
> Key: FLINK-4387
> URL: https://issues.apache.org/jira/browse/FLINK-4387
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.5.0, 1.6.0
>Reporter: Robert Metzger
>Assignee: Nico Kruber
>Priority: Major
>  Labels: test-stability
> Fix For: 1.2.0, 1.6.0
>
>
> According to this log: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt
> the {{KvStateClientTest}} didn't complete.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() 
> [0x7fb2bcb3b000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
>   - locked <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32)
>   at 
> org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185)
>   at 
> org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}
> and
> {code}
> Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError
>   at 
> io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
>   at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388067#comment-16388067
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5500


> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFu...

2018-03-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5500


---


[jira] [Created] (FLINK-8884) The DispatcherThreadFactory should register uncaught exception handlers

2018-03-06 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8884:
---

 Summary: The DispatcherThreadFactory should register uncaught 
exception handlers
 Key: FLINK-8884
 URL: https://issues.apache.org/jira/browse/FLINK-8884
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0, 1.6.0


The {{DispatcherThreadFactory}} is responsible for spawning the thread pool 
threads for TaskManager's async dispatcher and for the CheckpointCoordinators 
timed trigger.

In case of uncaught exceptions in these threads, the system is not healthy and 
more, hence they should register the {{FatalExitUcaughtExceptionsHandler}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8885) The DispatcherThreadFactory should register uncaught exception handlers

2018-03-06 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8885:
---

 Summary: The DispatcherThreadFactory should register uncaught 
exception handlers
 Key: FLINK-8885
 URL: https://issues.apache.org/jira/browse/FLINK-8885
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0, 1.6.0


The {{DispatcherThreadFactory}} is responsible for spawning the thread pool 
threads for TaskManager's async dispatcher and for the CheckpointCoordinators 
timed trigger.

In case of uncaught exceptions in these threads, the system is not healthy and 
more, hence they should register the {{FatalExitUcaughtExceptionsHandler}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8883) ExceptionUtils.rethrowIfFatalError should tread ThreadDeath as fatal.

2018-03-06 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8883:
---

 Summary: ExceptionUtils.rethrowIfFatalError should tread 
ThreadDeath as fatal.
 Key: FLINK-8883
 URL: https://issues.apache.org/jira/browse/FLINK-8883
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0, 1.6.0


Thread deaths leave code in inconsistent state and should thus always be 
forwarded as fatal exceptions that cannot be handled in any way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388049#comment-16388049
 ] 

ASF GitHub Bot commented on FLINK-8876:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5645
  
Making the concurrent access check re-entrant fixes all tests.

My feeling would be to not add an end-to-end test for this, because 
end-to-end tests are quite expensive. Is this mainly for validating that this 
works by configuring the log level in your opinion?


> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5645: FLINK-8876 Improve concurrent access handling in stateful...

2018-03-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5645
  
Making the concurrent access check re-entrant fixes all tests.

My feeling would be to not add an end-to-end test for this, because 
end-to-end tests are quite expensive. Is this mainly for validating that this 
works by configuring the log level in your opinion?


---


[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-03-06 Thread Wind (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388046#comment-16388046
 ] 

Wind commented on FLINK-6924:
-

I am not sure if [~Terry1897] is still doing this.

However, I think it's necessary to add this table API because we already have 
the related SQL function.

Anyone can check my PR above ?

> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: zjuwangg
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-06 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8560.
-
Resolution: Fixed

Merged on master with 159986292e35a71737bcc434d5f20f385973fafa

and on release-1.5 with d385e094c8559a2d144bc95f4449c24a696352ab

> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388034#comment-16388034
 ] 

ASF GitHub Bot commented on FLINK-8824:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5620
  
+1

Looks good, merging this.


> In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
> --
>
> Key: FLINK-8824
> URL: https://issues.apache.org/jira/browse/FLINK-8824
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The connector uses {{getCanonicalName()}} in all places, gather than 
> {{getClassName()}}.
> {{getCanonicalName()}}'s intention is to normalize class names for arrays, 
> etc, but is problematic when instantiating classes from class names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5620: [FLINK-8824] [kafka] Replace getCanonicalName with getNam...

2018-03-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5620
  
+1

Looks good, merging this.


---


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388031#comment-16388031
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5481


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5481


---


[jira] [Updated] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8830:

Priority: Blocker  (was: Major)

> YarnResourceManager throws NullPointerException
> ---
>
> Key: FLINK-8830
> URL: https://issues.apache.org/jira/browse/FLINK-8830
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>
>  
> {code:java}
> java.lang.NullPointerException
> at java.io.File.(File.java:277)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
> at 
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
> at 
> org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
>  
> This exception is being thrown in 
> `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
> apparently is being ignored, since the test completes. It seems like this 
> line:
> {code:java}
> String fileLocation = 
> System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
> Is not guarded against returned null value. I don't know if that's a test or 
> production code issue.
> CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8830:

Fix Version/s: 1.5.0

> YarnResourceManager throws NullPointerException
> ---
>
> Key: FLINK-8830
> URL: https://issues.apache.org/jira/browse/FLINK-8830
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>
>  
> {code:java}
> java.lang.NullPointerException
> at java.io.File.(File.java:277)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
> at 
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
> at 
> org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
>  
> This exception is being thrown in 
> `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
> apparently is being ignored, since the test completes. It seems like this 
> line:
> {code:java}
> String fileLocation = 
> System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
> Is not guarded against returned null value. I don't know if that's a test or 
> production code issue.
> CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8836:

Priority: Major  (was: Critical)

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-03-06 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388012#comment-16388012
 ] 

Stefan Richter commented on FLINK-8867:
---

The reported exception is not showing the root cause, just some effect. There 
should be another exception earlier in the log that could give a hint.

> Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
> 
>
> Key: FLINK-8867
> URL: https://issues.apache.org/jira/browse/FLINK-8867
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, State Backends, Checkpointing, YARN
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Shashank Agarwal
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> In our setup, when we put an entry in our Flink_conf file for default schema.
> {code}
> fs.default-scheme: hdfs://mydomain.com:8020/flink
> {code}
> Than application with rocksdb state backend fails with the following 
> exception. When we remove this config it works fine. It's working fine with 
> other state backends.
> {code}
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
> for operator order ip stream (1/1).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator order ip stream (1/1).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-03-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388008#comment-16388008
 ] 

Aljoscha Krettek commented on FLINK-8867:
-

[~srichter] & [~StephanEwen] do you have any idea what could be causing this? 
We were chasing this for quite a while ...

> Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
> 
>
> Key: FLINK-8867
> URL: https://issues.apache.org/jira/browse/FLINK-8867
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, State Backends, Checkpointing, YARN
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Shashank Agarwal
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> In our setup, when we put an entry in our Flink_conf file for default schema.
> {code}
> fs.default-scheme: hdfs://mydomain.com:8020/flink
> {code}
> Than application with rocksdb state backend fails with the following 
> exception. When we remove this config it works fine. It's working fine with 
> other state backends.
> {code}
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
> for operator order ip stream (1/1).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator order ip stream (1/1).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8870) End-to-end tests wrongly pass if md5sum is not installed

2018-03-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-8870:
-

Reopen to remove fixVersion

> End-to-end tests wrongly pass if md5sum is not installed
> 
>
> Key: FLINK-8870
> URL: https://issues.apache.org/jira/browse/FLINK-8870
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.1
>Reporter: Florian Schmidt
>Priority: Major
>
> Actual: The end-to-end tests don't fail if md5sum is not installed
> {code:bash}
> Job with JobID 95f6a482cc9800f1daeb907f4940a116 has finished.
> Job Runtime: 1067 ms
> /Users/florianschmidt/dev/flink/flink-end-to-end-tests/test-scripts/common.sh:
>  line 120: md5sum: command not found
> pass WordCount
> Stopping taskexecutor daemon (pid: 33007) on host Florians-MBP.fritz.box.
> Stopping standalonesession daemon (pid: 32715) on host Florians-MBP.fritz.box.
> All tests PASS
> {code}
> Expected: The tests should fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8870) End-to-end tests wrongly pass if md5sum is not installed

2018-03-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8870.
---
   Resolution: Not A Problem
Fix Version/s: (was: 1.5.0)

> End-to-end tests wrongly pass if md5sum is not installed
> 
>
> Key: FLINK-8870
> URL: https://issues.apache.org/jira/browse/FLINK-8870
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.1
>Reporter: Florian Schmidt
>Priority: Major
>
> Actual: The end-to-end tests don't fail if md5sum is not installed
> {code:bash}
> Job with JobID 95f6a482cc9800f1daeb907f4940a116 has finished.
> Job Runtime: 1067 ms
> /Users/florianschmidt/dev/flink/flink-end-to-end-tests/test-scripts/common.sh:
>  line 120: md5sum: command not found
> pass WordCount
> Stopping taskexecutor daemon (pid: 33007) on host Florians-MBP.fritz.box.
> Stopping standalonesession daemon (pid: 32715) on host Florians-MBP.fritz.box.
> All tests PASS
> {code}
> Expected: The tests should fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387970#comment-16387970
 ] 

Aljoscha Krettek commented on FLINK-8834:
-

[~StephanEwen] What does that mean for this issue? Can we close it?

> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
> Fix For: 1.5.0
>
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks on one of the Task 
> Managers are stuck in "Cancelling" state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865)
> 

[jira] [Commented] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()

2018-03-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387966#comment-16387966
 ] 

Aljoscha Krettek commented on FLINK-4387:
-

[~NicoK] Why is this not {{fixVersion == 1.5}} anymore if you reopened?

> Instability in KvStateClientTest.testClientServerIntegration()
> --
>
> Key: FLINK-4387
> URL: https://issues.apache.org/jira/browse/FLINK-4387
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.5.0, 1.6.0
>Reporter: Robert Metzger
>Assignee: Nico Kruber
>Priority: Major
>  Labels: test-stability
> Fix For: 1.2.0, 1.6.0
>
>
> According to this log: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt
> the {{KvStateClientTest}} didn't complete.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() 
> [0x7fb2bcb3b000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
>   - locked <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32)
>   at 
> org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185)
>   at 
> org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}
> and
> {code}
> Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError
>   at 
> io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
>   at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2018-03-06 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387849#comment-16387849
 ] 

Stephan Ewen commented on FLINK-8871:
-

For a change to checkpointing like this, we need to make sure we have a good 
design discussion.
Even if this may not make it into 1.5, we can still start with the design 
discussion, looking at pro/con of the different options.

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2, 1.5.0, 1.4.1
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.6.0
>
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387845#comment-16387845
 ] 

ASF GitHub Bot commented on FLINK-8876:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5645
  
Thanks for the great review.

Good catch, I bet the missing check against the current thread is the 
reason for the test failure.


> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5645: FLINK-8876 Improve concurrent access handling in stateful...

2018-03-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5645
  
Thanks for the great review.

Good catch, I bet the missing check against the current thread is the 
reason for the test failure.


---


[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2018-03-06 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387820#comment-16387820
 ] 

Sihua Zhou commented on FLINK-8871:
---

[~srichter] That is fine, looking forward the PR. ;)

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2, 1.5.0, 1.4.1
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.6.0
>
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >