[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4935
  
Hi @Aitozi, sorry for the long delay in relaying back to this PR.

I'm still not convinced that this is a sane solution. For example, what is 
a "good" setting for the `KEY_REGISTER_TIMES` property? Isn't 1 enough, since 
you mentioned that the missing metric is registered in Kafka after the first 
poll. Making this configurable seems unnecessary to me.

I wonder if we can try the following two approaches:
1) Manually register the metrics that we know would be missing before the 
first poll, or
2) Poll once first outside the loop just to make sure that all Kafka 
metrics are existent, perform the metrics registration, and then start the 
regular fetch loop.

What do you think?


---


[GitHub] flink issue #5017: [FLINK-8076] Upgrade KinesisProducer to 0.10.6 to set pro...

2017-11-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5017
  
Merging ...


---


[jira] [Commented] (FLINK-8076) Upgrade KinesisProducer to 0.10.6 to set properties approperiately

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270326#comment-16270326
 ] 

ASF GitHub Bot commented on FLINK-8076:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5017
  
Merging ...


> Upgrade KinesisProducer to 0.10.6 to set properties approperiately
> --
>
> Key: FLINK-8076
> URL: https://issues.apache.org/jira/browse/FLINK-8076
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5075: [hotfix] [docs] Fix typos in State Backends doc

2017-11-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5075
  
Thanks! Merging this ..


---


[jira] [Commented] (FLINK-8027) Generalize existing rest handlers to work with arbitrary RestfulGateway

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270313#comment-16270313
 ] 

ASF GitHub Bot commented on FLINK-8027:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4985#discussion_r153708910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple {@link TaskExecutorGateway} implementation for testing purposes.
+ */
+public class TestingTaskExecutorGateway implements TaskExecutorGateway {
+
+   private final String address;
+
+   private final String hostname;
+
+   public TestingTaskExecutorGateway() {
+   this("foobar:1234", "foobar");
+   }
+
+   public TestingTaskExecutorGateway(String address, String hostname) {
+   this.address = Preconditions.checkNotNull(address);
+   this.hostname = Preconditions.checkNotNull(hostname);
+   }
+
+   @Override
+   public CompletableFuture requestSlot(SlotID slotId, JobID 
jobId, AllocationID allocationId, String targetAddress, ResourceManagerId 
resourceManagerId, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) 
{
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
updatePartitions(ExecutionAttemptID executionAttemptID, Iterable 
partitionInfos, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public void failPartition(ExecutionAttemptID executionAttemptID) {
+   // noop
+   }
+
+   @Override
+   public CompletableFuture 
triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, 
long checkpointTimestamp, CheckpointOptions checkpointOptions) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, 
long checkpointTimestamp) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture stopTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+   // noop
+   }
+
+   @Override
+   public void 

[GitHub] flink pull request #4985: [FLINK-8027] Generalize existing rest handlers to ...

2017-11-28 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4985#discussion_r153708910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple {@link TaskExecutorGateway} implementation for testing purposes.
+ */
+public class TestingTaskExecutorGateway implements TaskExecutorGateway {
+
+   private final String address;
+
+   private final String hostname;
+
+   public TestingTaskExecutorGateway() {
+   this("foobar:1234", "foobar");
+   }
+
+   public TestingTaskExecutorGateway(String address, String hostname) {
+   this.address = Preconditions.checkNotNull(address);
+   this.hostname = Preconditions.checkNotNull(hostname);
+   }
+
+   @Override
+   public CompletableFuture requestSlot(SlotID slotId, JobID 
jobId, AllocationID allocationId, String targetAddress, ResourceManagerId 
resourceManagerId, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) 
{
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
updatePartitions(ExecutionAttemptID executionAttemptID, Iterable 
partitionInfos, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public void failPartition(ExecutionAttemptID executionAttemptID) {
+   // noop
+   }
+
+   @Override
+   public CompletableFuture 
triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, 
long checkpointTimestamp, CheckpointOptions checkpointOptions) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture 
confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, 
long checkpointTimestamp) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture stopTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public CompletableFuture cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+   return CompletableFuture.completedFuture(Acknowledge.get());
+   }
+
+   @Override
+   public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+   // noop
+   }
+
+   @Override
+   public void heartbeatFromResourceManager(ResourceID heartbeatOrigin) {
+   // noop
+   }
+
+   @Override
+   public void disconnectJobManager(JobID jobId, Exception cause) {
+   // nooop
--- End diff --


[jira] [Closed] (FLINK-7976) bump japicmp-maven-plugin version in Flink

2017-11-28 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-7976.
---
Resolution: Duplicate

> bump japicmp-maven-plugin version in Flink
> --
>
> Key: FLINK-7976
> URL: https://issues.apache.org/jira/browse/FLINK-7976
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> bump japicmp-maven-plugin version from 0.7.0 to 0.11.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-11-28 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270227#comment-16270227
 ] 

Bowen Li commented on FLINK-5544:
-

[~xiaogang.shi] [~srichter] what's the status of this story, guys?

> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270123#comment-16270123
 ] 

ASF GitHub Bot commented on FLINK-8061:
---

Github user vetriselvan1187 closed the pull request at:

https://github.com/apache/flink/pull/5008


> Remove trailing asterisk in QueryableStateClient javadocs
> -
>
> Key: FLINK-8061
> URL: https://issues.apache.org/jira/browse/FLINK-8061
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeHint   A {@link TypeHint} used 
> to extract the type of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeInfo   The {@link 
> TypeInformation} of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5008: [FLINK-8061] [Queryable State] removed trailing as...

2017-11-28 Thread vetriselvan1187
Github user vetriselvan1187 closed the pull request at:

https://github.com/apache/flink/pull/5008


---


[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270116#comment-16270116
 ] 

ASF GitHub Bot commented on FLINK-8158:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5094
  
Hi @hequn8128, let me try to explain this.

1. In current implementation,  the join process just relies on the cached 
rows instead of the watermarks. Specifically, when receiving a record, the join 
function will only check whether there exist qualified rows of the opposite 
cache in spite of the lateness. Thus if the cache ***is not cleaned up in 
time***, the outdated results will be emitted.

2. Strictly speaking, the value for holding back watermarks should be 
dynamically reported by the join function in runtime. The current 
implementation temporarily uses a static value (`MaxOutputDelay`) for that. In 
other words, the holding back value should be decided by the cached rows, 
rather than, the cache size should be decided by `MaxOutputDelay`.

Hope that helps.

Best, Xingcan


> Rowtime window inner join emits late data
> -
>
> Key: FLINK-8158
> URL: https://issues.apache.org/jira/browse/FLINK-8158
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
> Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
> val sqlQuery =
>   """
> SELECT t2.key, t2.id, t1.id
> FROM T1 as t1 join T2 as t2 ON
>   t1.key = t2.key AND
>   t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
> t2.rt + INTERVAL '1' SECOND
> """.stripMargin
> val data1 = new mutable.MutableList[(String, String, Long)]
> // for boundary test
> data1.+=(("A", "LEFT1", 6000L))
> val data2 = new mutable.MutableList[(String, String, Long)]
> data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5094: [FLINK-8158] [table] Fix rowtime window inner join emits ...

2017-11-28 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5094
  
Hi @hequn8128, let me try to explain this.

1. In current implementation,  the join process just relies on the cached 
rows instead of the watermarks. Specifically, when receiving a record, the join 
function will only check whether there exist qualified rows of the opposite 
cache in spite of the lateness. Thus if the cache ***is not cleaned up in 
time***, the outdated results will be emitted.

2. Strictly speaking, the value for holding back watermarks should be 
dynamically reported by the join function in runtime. The current 
implementation temporarily uses a static value (`MaxOutputDelay`) for that. In 
other words, the holding back value should be decided by the cached rows, 
rather than, the cache size should be decided by `MaxOutputDelay`.

Hope that helps.

Best, Xingcan


---


[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270090#comment-16270090
 ] 

ASF GitHub Bot commented on FLINK-8158:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5094
  
Hi @xccui , thanks for your reply. Feel free to take it if you wish. I 
still have some confusions. 
1. Considering the test `testRowTimeJoinWithCommonBounds2` in 
`JoinHarnessTest`, do you mean the row with timestamp 1000 should not been 
calculated? The row does satisfy the join condition: `t1.rt BETWEEN t2.rt - 
INTERVAL '5' SECOND AND t2.rt + INTERVAL '1' SECOND`, and this is the 
difference between `BETWEEN` and `NOT BETWEEN`.
2. Can't we use the holding back watermark as the boundary to cache and 
expire data? Any data with timestamp bigger than the holding back watermark 
should be cached and may be joined later.  We should take any opportunity to 
join and produce a result that satisfy the join predicate.


> Rowtime window inner join emits late data
> -
>
> Key: FLINK-8158
> URL: https://issues.apache.org/jira/browse/FLINK-8158
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
> Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
> val sqlQuery =
>   """
> SELECT t2.key, t2.id, t1.id
> FROM T1 as t1 join T2 as t2 ON
>   t1.key = t2.key AND
>   t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
> t2.rt + INTERVAL '1' SECOND
> """.stripMargin
> val data1 = new mutable.MutableList[(String, String, Long)]
> // for boundary test
> data1.+=(("A", "LEFT1", 6000L))
> val data2 = new mutable.MutableList[(String, String, Long)]
> data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5094: [FLINK-8158] [table] Fix rowtime window inner join emits ...

2017-11-28 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5094
  
Hi @xccui , thanks for your reply. Feel free to take it if you wish. I 
still have some confusions. 
1. Considering the test `testRowTimeJoinWithCommonBounds2` in 
`JoinHarnessTest`, do you mean the row with timestamp 1000 should not been 
calculated? The row does satisfy the join condition: `t1.rt BETWEEN t2.rt - 
INTERVAL '5' SECOND AND t2.rt + INTERVAL '1' SECOND`, and this is the 
difference between `BETWEEN` and `NOT BETWEEN`.
2. Can't we use the holding back watermark as the boundary to cache and 
expire data? Any data with timestamp bigger than the holding back watermark 
should be cached and may be joined later.  We should take any opportunity to 
join and produce a result that satisfy the join predicate.


---


[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics

2017-11-28 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269921#comment-16269921
 ] 

Wei-Che Wei commented on FLINK-7692:


[~Zentol]
This idea is much better. I would like to keep working on it based on your 
scaffold.
One more thing needs to be confirmed: I need to distinguish when to add 
{{GenericKeyMetricGroup}} or {{GenericMetricGroup}} if a group {{name}} doesn't 
exist, since it will always create {{GenericKeyMetricGroup}} in your scaffold. 
Am I right?

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269617#comment-16269617
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5076
  
These should generally be pretty safe changes to make IMO, and well worth 
it. In particular the undeclared usage of transitive dependencies (that may be 
shaded) frequently caused headaches, and should now be much easier to detect.

We have to look out for dependencies that are used exclusively via 
reflection and those we have for the sake of including them in flink-dist; 
can't think of other special cases at the moment.


> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-11-28 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5076
  
These should generally be pretty safe changes to make IMO, and well worth 
it. In particular the undeclared usage of transitive dependencies (that may be 
shaded) frequently caused headaches, and should now be much easier to detect.

We have to look out for dependencies that are used exclusively via 
reflection and those we have for the sake of including them in flink-dist; 
can't think of other special cases at the moment.


---


[GitHub] flink pull request #4383: [hotfix] [optimizer] Normalize job plan operator f...

2017-11-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4383


---


[jira] [Closed] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-11-28 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-6864.
-
Resolution: Implemented

master: 450b4241404055ed6638e354be421b83380827c5

> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
> Fix For: 1.5.0
>
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269526#comment-16269526
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4574


> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
> Fix For: 1.5.0
>
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4574: [FLINK-6864] Fix confusing "invalid POJO type" mes...

2017-11-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4574


---


[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2017-11-28 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269508#comment-16269508
 ] 

Chesnay Schepler commented on FLINK-6053:
-

We can't properly subsume the existing gauge type until the Histogram was 
reworked. Once that is done, we will deprecate the freestyle gauge type and 
introduce new String-/NumberGauge types.

> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 2.0.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269491#comment-16269491
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4574
  
@zjureel am merging this ... thanks for the PR and edits!


> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
> Fix For: 1.5.0
>
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-11-28 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-6864:
--
Fix Version/s: 1.5.0

> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
> Fix For: 1.5.0
>
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4574: [FLINK-6864] Fix confusing "invalid POJO type" messages f...

2017-11-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4574
  
@zjureel am merging this ... thanks for the PR and edits!


---


[GitHub] flink issue #4383: [hotfix] [optimizer] Normalize job plan operator formatti...

2017-11-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4383
  
@zentol @fhueske I am merging the change with the extra space since this 
looks to have been the original intent. I've looked at both forms without 
finding a strong preference.


---


[jira] [Updated] (FLINK-8164) JobManager's archiving does not work on S3

2017-11-28 Thread Cristian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cristian updated FLINK-8164:

Description: 
I'm trying to configure JobManager's archiving mechanism 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html)
 to use S3 but I'm getting this:

{code}
2017-11-28 19:11:09,751 WARN  
org.apache.flink.runtime.jobmanager.MemoryArchivist   - Failed to 
create Path for Some(s3a://bucket/completed-jobs). Job will not be archived.
java.lang.IllegalArgumentException: No file system found with scheme s3, 
referenced in file URI 's3://bucket/completed-jobs'.
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

Which is very weird since I'm able to write to S3 from within the job itself. I 
have also tried using s3a instead to no avail.

This happens running Flink v1.3.2 on EMR.

  was:
I'm trying to configure JobManager's archiving mechanism 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html)
 to use S3 but I'm getting this:

{code}
2017-11-28 19:11:09,751 WARN  
org.apache.flink.runtime.jobmanager.MemoryArchivist   - Failed to 
create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be 
archived.
java.lang.IllegalArgumentException: No file system found with scheme s3, 
referenced in file URI 's3://scopely-flink-dev/completed-jobs'.
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

Which is very weird since I'm able to write to S3 from within the job itself. I 
have also tried using s3a instead to no avail.

This happens running 

[jira] [Updated] (FLINK-8164) JobManager's archiving does not work on S3

2017-11-28 Thread Cristian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cristian updated FLINK-8164:

Description: 
I'm trying to configure JobManager's archiving mechanism 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html)
 to use S3 but I'm getting this:

{code}
2017-11-28 19:11:09,751 WARN  
org.apache.flink.runtime.jobmanager.MemoryArchivist   - Failed to 
create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be 
archived.
java.lang.IllegalArgumentException: No file system found with scheme s3, 
referenced in file URI 's3://scopely-flink-dev/completed-jobs'.
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

Which is very weird since I'm able to write to S3 from within the job itself. I 
have also tried using s3a instead to no avail.

This happens running Flink v1.3.2 on EMR.

  was:
I'm trying to configure JobManager's archiving mechanism 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html)
 to use S3 but I'm getting this:

{code}
2017-11-28 19:11:09,751 WARN  
org.apache.flink.runtime.jobmanager.MemoryArchivist   - Failed to 
create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be 
archived.
java.lang.IllegalArgumentException: No file system found with scheme s3a, 
referenced in file URI 's3a://scopely-flink-dev/completed-jobs'.
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

Which is very weird since I'm able to write to S3 from within the job itself.

This happens running Flink v1.3.2 on EMR.


> 

[jira] [Created] (FLINK-8164) JobManager's archiving does not work on S3

2017-11-28 Thread Cristian (JIRA)
Cristian created FLINK-8164:
---

 Summary: JobManager's archiving does not work on S3
 Key: FLINK-8164
 URL: https://issues.apache.org/jira/browse/FLINK-8164
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Reporter: Cristian


I'm trying to configure JobManager's archiving mechanism 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html)
 to use S3 but I'm getting this:

{code}
2017-11-28 19:11:09,751 WARN  
org.apache.flink.runtime.jobmanager.MemoryArchivist   - Failed to 
create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be 
archived.
java.lang.IllegalArgumentException: No file system found with scheme s3a, 
referenced in file URI 's3a://scopely-flink-dev/completed-jobs'.
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

Which is very weird since I'm able to write to S3 from within the job itself.

This happens running Flink v1.3.2 on EMR.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4374: repalce map.put with putIfAbsent

2017-11-28 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4374
  
but honestly, the previous approach just looks wrong: adding it and in case 
of a conflict reverting that `put` operation.
What if (and I'm walking in the dark here) something already operates on 
the invalidly registered task (is this run in parallel?). I cannot spot any 
parallel use though...


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269114#comment-16269114
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -114,7 +116,7 @@ public void onNotification() {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269116#comment-16269116
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153563915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   public Buffer getNextBufferInternal() {
--- End diff --

make this `protected`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269115#comment-16269115
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,52 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
 
/**
 * Returns the next {@link Buffer} instance of this queue iterator.
-* 
-* If there is currently no instance available, it will return 
null.
+*
+* If there is currently no instance available, it will return 
null.
 * This might happen for example when a pipelined queue producer is 
slower
 * than the consumer or a spilled queue needs to read in more data.
-* 
-* Important: The consumer has to make sure that each
+*
+* Important: The consumer has to make sure that 
each
 * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
 * after it has been consumed.
 */
-   Buffer getNextBuffer() throws IOException, InterruptedException;
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   Buffer buffer = getNextBufferInternal();
+   if (buffer != null) {
+   parent.decreaseStatistics(buffer);
+   }
+   return buffer;
+   }
+
+   public int getBuffersInBacklog() {
+   return parent.getBuffersInBacklog();
+   }
 
-   void notifyBuffersAvailable(long buffers) throws IOException;
+   protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
--- End diff --

please add a javadoc with the intended relation to `getNextBuffer`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269117#comment-16269117
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564111
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 ---
@@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocationOnMock) throws T
 
// 
-
 
-   static class InfiniteSubpartitionView implements ResultSubpartitionView 
{
+   static class InfiniteSubpartitionView extends ResultSubpartitionView {
 
private final BufferProvider bufferProvider;
 
private final CountDownLatch sync;
 
public InfiniteSubpartitionView(BufferProvider bufferProvider, 
CountDownLatch sync) {
+   super(mock(ResultSubpartition.class));
+
this.bufferProvider = checkNotNull(bufferProvider);
this.sync = checkNotNull(sync);
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, 
InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269118#comment-16269118
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -114,7 +116,7 @@ public void onNotification() {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,52 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
 
/**
 * Returns the next {@link Buffer} instance of this queue iterator.
-* 
-* If there is currently no instance available, it will return 
null.
+*
+* If there is currently no instance available, it will return 
null.
 * This might happen for example when a pipelined queue producer is 
slower
 * than the consumer or a spilled queue needs to read in more data.
-* 
-* Important: The consumer has to make sure that each
+*
+* Important: The consumer has to make sure that 
each
 * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
 * after it has been consumed.
 */
-   Buffer getNextBuffer() throws IOException, InterruptedException;
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   Buffer buffer = getNextBufferInternal();
+   if (buffer != null) {
+   parent.decreaseStatistics(buffer);
+   }
+   return buffer;
+   }
+
+   public int getBuffersInBacklog() {
+   return parent.getBuffersInBacklog();
+   }
 
-   void notifyBuffersAvailable(long buffers) throws IOException;
+   protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
--- End diff --

please add a javadoc with the intended relation to `getNextBuffer`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153563915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   public Buffer getNextBufferInternal() {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564111
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 ---
@@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocationOnMock) throws T
 
// 
-
 
-   static class InfiniteSubpartitionView implements ResultSubpartitionView 
{
+   static class InfiniteSubpartitionView extends ResultSubpartitionView {
 
private final BufferProvider bufferProvider;
 
private final CountDownLatch sync;
 
public InfiniteSubpartitionView(BufferProvider bufferProvider, 
CountDownLatch sync) {
+   super(mock(ResultSubpartition.class));
+
this.bufferProvider = checkNotNull(bufferProvider);
this.sync = checkNotNull(sync);
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, 
InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


---


[jira] [Commented] (FLINK-8151) [Table] removing map value equality check

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269092#comment-16269092
 ] 

ASF GitHub Bot commented on FLINK-8151:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5070#discussion_r153566048
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -188,13 +188,6 @@ object ScalarOperators {
 (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, 
$rightTerm)"
   }
 }
-// map types
-else if (isMap(left.resultType) &&
-  left.resultType.getTypeClass == right.resultType.getTypeClass) {
-  generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) 
{
-(leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, 
$rightTerm)"
--- End diff --

I tried to use this but this is an exact object match, which is very 
different from `Array.equals`, which is indexed element match. so there's a 
problem here: 
1. an object referred on both the left and right hand side originally might 
become different object depending on the ser/deser (and they still represent 
the same Map); 
2. if two Map objects with exact same key and value pairs, they should be 
considered equal, however they are not equal under object match;
3. it is also a debate on whether `Map` should be considered as ordered or 
unordered map.

I think this worth another PR to address.


> [Table] removing map value equality check
> -
>
> Key: FLINK-8151
> URL: https://issues.apache.org/jira/browse/FLINK-8151
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Following up with FLINK-8038. The equality check is not working as Map does 
> not support element-wise equality. Suggest to remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8151) [Table] removing map value equality check

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269090#comment-16269090
 ] 

ASF GitHub Bot commented on FLINK-8151:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5070#discussion_r153566038
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
 ---
@@ -139,13 +139,6 @@ class MapTypeTest extends MapTypeTestBase {
   @Test
   def testMapOperations(): Unit = {
 
-// comparison
-testAllApis(
--- End diff --

That was my bad, I was only testing the invalid equality case ( e.g. 
`MAP[STRING, INT] = MAP[STRING,STRING]` should throw ValidationException), I 
did not put in the valid equality test :-(


> [Table] removing map value equality check
> -
>
> Key: FLINK-8151
> URL: https://issues.apache.org/jira/browse/FLINK-8151
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Following up with FLINK-8038. The equality check is not working as Map does 
> not support element-wise equality. Suggest to remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5070: [FLINK-8151][table]Remove Map type equality compar...

2017-11-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5070#discussion_r153566048
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -188,13 +188,6 @@ object ScalarOperators {
 (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, 
$rightTerm)"
   }
 }
-// map types
-else if (isMap(left.resultType) &&
-  left.resultType.getTypeClass == right.resultType.getTypeClass) {
-  generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) 
{
-(leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, 
$rightTerm)"
--- End diff --

I tried to use this but this is an exact object match, which is very 
different from `Array.equals`, which is indexed element match. so there's a 
problem here: 
1. an object referred on both the left and right hand side originally might 
become different object depending on the ser/deser (and they still represent 
the same Map); 
2. if two Map objects with exact same key and value pairs, they should be 
considered equal, however they are not equal under object match;
3. it is also a debate on whether `Map` should be considered as ordered or 
unordered map.

I think this worth another PR to address.


---


[GitHub] flink pull request #5070: [FLINK-8151][table]Remove Map type equality compar...

2017-11-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5070#discussion_r153566038
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
 ---
@@ -139,13 +139,6 @@ class MapTypeTest extends MapTypeTestBase {
   @Test
   def testMapOperations(): Unit = {
 
-// comparison
-testAllApis(
--- End diff --

That was my bad, I was only testing the invalid equality case ( e.g. 
`MAP[STRING, INT] = MAP[STRING,STRING]` should throw ValidationException), I 
did not put in the valid equality test :-(


---


[jira] [Commented] (FLINK-8104) Fix Row value constructor

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269082#comment-16269082
 ] 

ASF GitHub Bot commented on FLINK-8104:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5040
  
Thanks @twalthr for confirming, I will also add in the documentation today 
then.


> Fix Row value constructor
> -
>
> Key: FLINK-8104
> URL: https://issues.apache.org/jira/browse/FLINK-8104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Support Row value constructor which is currently broken. 
> See 
> {code:java}
> // 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
>   @Test
>   def testValueConstructorFunctions(): Unit = {
> // TODO we need a special code path that flattens ROW types
> // testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
> returns field 0
> // testSqlApi("('hello world', 12)", "hello world") // test base only 
> returns field 0
> // ...
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5040: [FLINK-8104][Table API] fixing ROW type value constructor...

2017-11-28 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5040
  
Thanks @twalthr for confirming, I will also add in the documentation today 
then.


---


[jira] [Assigned] (FLINK-8119) Cannot submit jobs to YARN Session in FLIP-6 mode

2017-11-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-8119:


Assignee: Till Rohrmann

> Cannot submit jobs to YARN Session in FLIP-6 mode
> -
>
> Key: FLINK-8119
> URL: https://issues.apache.org/jira/browse/FLINK-8119
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Cannot submit jobs to YARN Session in FLIP-6 mode because 
> {{FlinkYarnSessionCli}} becomes the _active_ CLI (should be 
> {{Flip6DefaultCLI}}).
> *Steps to reproduce*
> # Build Flink 1.5 {{101fef7397128b0aba23221481ab86f62b18118f}}
> # {{bin/yarn-session.sh -flip6 -d -n 1 -s 1 -jm 1024 -tm 1024}}
> # {{bin/flink run -flip6 ./examples/streaming/WordCount.jar}}
> # Verify that the job will not run.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem

2017-11-28 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269041#comment-16269041
 ] 

Stephan Ewen commented on FLINK-5789:
-

We may want to add further abstractions to the file system.

The S3 API has an interesting approach, allowing you to upload multiple 
individual chunks (multipart upload) and then issue a separate request to 
commit a set of these parts. That could be used, for example, to stage/flush 
parts on checkpoint, but not commit (publish) until it is time to roll over the 
bucket.

Because that is very S3 specific, it may make sense to have an abstraction for 
temp file, temp regions, committing those, etc. on top of the {{FileSystem}} 
abstraction.

> Make Bucketing Sink independent of Hadoop's FileSystem
> --
>
> Key: FLINK-5789
> URL: https://issues.apache.org/jira/browse/FLINK-5789
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Stephan Ewen
> Fix For: 1.5.0
>
>
> The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's 
> file system abstraction.
> This causes several issues:
>   - The bucketing sink will behave different than other file sinks with 
> respect to configuration
>   - Directly supported file systems (not through hadoop) like the MapR File 
> System does not work in the same way with the BuketingSink as other file 
> systems
>   - The previous point is all the more problematic in the effort to make 
> Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, 
> AWS, GCE, Azure) with ideally no Hadoop dependency.
> We should port the {{BucketingSink}} to use Flink's FileSystem classes.
> To support the *truncate* functionality that is needed for the exactly-once 
> semantics of the Bucketing Sink, we should extend Flink's FileSystem 
> abstraction to have the methods
>   - {{boolean supportsTruncate()}}
>   - {{void truncate(Path, long)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #:

2017-11-28 Thread aljoscha
Github user aljoscha commented on the pull request:


https://github.com/apache/flink/commit/c940d5eff9897796625a696ed2989aed52c39ebd#commitcomment-25902242
  
In tools/releasing/create_source_release.sh:
In tools/releasing/create_source_release.sh on line 60:
could, but that's not strictly necessary anymore because I also introduced 
the temporary `git clone`.


---


[jira] [Commented] (FLINK-7918) Run AbstractTestBase tests on Flip-6 MiniCluster

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269039#comment-16269039
 ] 

ASF GitHub Bot commented on FLINK-7918:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5095

[FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster

## What is the purpose of the change

Extend `MiniClusterResource` to instantiate a Flip-6 `MiniCluster` if the 
system property `codebase` has been set to `flip6`. This allows to run all 
tests which are based on `AbstractTestBase` on the `Flip-6` code base.

## Brief change log

- Use system property `codebase` to distinguish between Flip-6 and old code 
base
- Instantiate respective `MiniCluster` in `MiniClusterResource` depending 
on the system property

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink flip6TestBase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5095.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5095


commit 1f6cb24a2b8e4b062fe12d756bfd5950ab35b0ce
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit aff11aaf02c7b5b667780031e01a1b08d85e036b
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 040cb326086968b5fe5d66dcc82835ee9dd092dc
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 5d7e117392a51a485a5cb682c4a2c12028e73b97
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.

commit 75f49d4bb3f38c7bfaee101607a459d558c00ba8
Author: Till Rohrmann 
Date:   2017-11-13T14:42:07Z

[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

commit b53729e74ccc31c634c5ac7db5e37ca45a66db8d
Author: Till Rohrmann 
Date:   2017-11-24T17:08:38Z

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to 
SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new 

[GitHub] flink pull request #5095: [FLINK-7918] Run AbstractTestBase tests on Flip-6 ...

2017-11-28 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5095

[FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster

## What is the purpose of the change

Extend `MiniClusterResource` to instantiate a Flip-6 `MiniCluster` if the 
system property `codebase` has been set to `flip6`. This allows to run all 
tests which are based on `AbstractTestBase` on the `Flip-6` code base.

## Brief change log

- Use system property `codebase` to distinguish between Flip-6 and old code 
base
- Instantiate respective `MiniCluster` in `MiniClusterResource` depending 
on the system property

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink flip6TestBase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5095.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5095


commit 1f6cb24a2b8e4b062fe12d756bfd5950ab35b0ce
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit aff11aaf02c7b5b667780031e01a1b08d85e036b
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 040cb326086968b5fe5d66dcc82835ee9dd092dc
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 5d7e117392a51a485a5cb682c4a2c12028e73b97
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.

commit 75f49d4bb3f38c7bfaee101607a459d558c00ba8
Author: Till Rohrmann 
Date:   2017-11-13T14:42:07Z

[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

commit b53729e74ccc31c634c5ac7db5e37ca45a66db8d
Author: Till Rohrmann 
Date:   2017-11-24T17:08:38Z

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to 
SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which 

[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268970#comment-16268970
 ] 

ASF GitHub Bot commented on FLINK-8158:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5094
  
Hi @hequn8128, thanks for looking into this. 

I've checked the current implementation and found that it really may emit 
late data. However, that was caused by the checkings below:

https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L173
and

https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L234

In some situations, they will not forbid the late rows from being 
calculated and emitted. Honestly, I cannot think out a solution in a short 
time. Do you want to continue working on that? Or I could take it over, if you 
don't mind.

Thanks, Xingcan


> Rowtime window inner join emits late data
> -
>
> Key: FLINK-8158
> URL: https://issues.apache.org/jira/browse/FLINK-8158
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
> Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
> val sqlQuery =
>   """
> SELECT t2.key, t2.id, t1.id
> FROM T1 as t1 join T2 as t2 ON
>   t1.key = t2.key AND
>   t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
> t2.rt + INTERVAL '1' SECOND
> """.stripMargin
> val data1 = new mutable.MutableList[(String, String, Long)]
> // for boundary test
> data1.+=(("A", "LEFT1", 6000L))
> val data2 = new mutable.MutableList[(String, String, Long)]
> data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5094: [FLINK-8158] [table] Fix rowtime window inner join emits ...

2017-11-28 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5094
  
Hi @hequn8128, thanks for looking into this. 

I've checked the current implementation and found that it really may emit 
late data. However, that was caused by the checkings below:

https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L173
and

https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L234

In some situations, they will not forbid the late rows from being 
calculated and emitted. Honestly, I cannot think out a solution in a short 
time. Do you want to continue working on that? Or I could take it over, if you 
don't mind.

Thanks, Xingcan


---


[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis

2017-11-28 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268948#comment-16268948
 ] 

Till Rohrmann commented on FLINK-7300:
--

Another instance of instable Kafka end-to-end tests: 
https://travis-ci.org/apache/flink/jobs/308347992

> End-to-end tests are instable on Travis
> ---
>
> Key: FLINK-7300
> URL: https://issues.apache.org/jira/browse/FLINK-7300
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>  Labels: test-stability
>
> It seems like the end-to-end tests are instable, causing the {{misc}} build 
> profile to sporadically fail.
> Incorrect matched output:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8
> Another failure example of a different cause then the above, also on the 
> end-to-end tests:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268907#comment-16268907
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153524341
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
 ---
@@ -29,16 +30,16 @@
 /**
  * Json serializer for {@link InstanceID}.
  */
-public class InstanceIDSerializer extends StdSerializer {
+public class ResourceIDSerializer extends StdSerializer {
 
private static final long serialVersionUID = 5798852092159615938L;
 
-   protected InstanceIDSerializer() {
-   super(InstanceID.class);
+   protected ResourceIDSerializer() {
+   super(ResourceID.class);
}
 
@Override
-   public void serialize(InstanceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+   public void serialize(ResourceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
gen.writeString(value.toString());
--- End diff --

Good point. Fixed with your commit.


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268908#comment-16268908
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5093
  
Thanks for the review @GJL.


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5093: [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as ...

2017-11-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5093
  
Thanks for the review @GJL.


---


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153524341
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
 ---
@@ -29,16 +30,16 @@
 /**
  * Json serializer for {@link InstanceID}.
  */
-public class InstanceIDSerializer extends StdSerializer {
+public class ResourceIDSerializer extends StdSerializer {
 
private static final long serialVersionUID = 5798852092159615938L;
 
-   protected InstanceIDSerializer() {
-   super(InstanceID.class);
+   protected ResourceIDSerializer() {
+   super(ResourceID.class);
}
 
@Override
-   public void serialize(InstanceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+   public void serialize(ResourceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
gen.writeString(value.toString());
--- End diff --

Good point. Fixed with your commit.


---


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268905#comment-16268905
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153524087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
 ---
@@ -53,8 +53,8 @@
public static final String FIELD_NAME_HARDWARE = "hardware";
 
@JsonProperty(FIELD_NAME_INSTANCE_ID)
--- End diff --

Good point. Fixed with your commit.


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268903#comment-16268903
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153524037
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
 ---
@@ -43,7 +43,7 @@
 
@JsonCreator
public TaskManagerDetailsInfo(
-   @JsonDeserialize(using = InstanceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+   @JsonDeserialize(using = ResourceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
--- End diff --

Good point. Fixed with your commit.


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153524087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
 ---
@@ -53,8 +53,8 @@
public static final String FIELD_NAME_HARDWARE = "hardware";
 
@JsonProperty(FIELD_NAME_INSTANCE_ID)
--- End diff --

Good point. Fixed with your commit.


---


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153524037
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
 ---
@@ -43,7 +43,7 @@
 
@JsonCreator
public TaskManagerDetailsInfo(
-   @JsonDeserialize(using = InstanceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+   @JsonDeserialize(using = ResourceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
--- End diff --

Good point. Fixed with your commit.


---


[GitHub] flink issue #4374: repalce map.put with putIfAbsent

2017-11-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4374
  
@RebornHuan although this change looks to be correct and makes good use of 
the newer API, there is a trade-off between deleting a line of code called 
during an error condition and the risk of such small refactorings. I don't 
think we have a demonstrated benefit to making this change.


---


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268878#comment-16268878
 ] 

ASF GitHub Bot commented on FLINK-7608:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4826
  
So here's the thing: The port of the metric itself is good, and exactly 
what i want  . What I'm unsatisfied with is the naming of the metric, but that 
isn't the fault of this PR but a limitation in the metric system.

The way the metric is named is inconsistent. The source task is encoded in 
the metric name with it's ID, but the target task is encoded in the scope as 
either ID or name, based on the scope format. (maybe reverse the order but you 
get the idea i hope).

The conclusion i arrived at is that this metric should not be registered at 
the task level (which just doesn't support the notion of a metric describing 2 
tasks) but at the job level instead (where we can do whatever we want in 
regards to tasks).

This would, in principal, allow us to have this identifier:
```myjob.latency.source.ABCDE.target.DEFGH.latency_p95```
But this still isn't satisfactory, because it is still hardly usable for 
key-value reporters, where we want to filter latencies based on the source or 
target, for which we need something like this:
```
logical scope: job.task.latency.latency_p95:
tags:
   job_name = myjob
   source = ABCDE
   target = DEFGH
```
For this to work however we first have to implement FLINK-7692 to support 
custom key-value pairs.

As such I would like to delay merging this PR by a week or 2 until the 
aforementioned feature is implemented, so we don't iterate through 3 different 
versions.


> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4826: [FLINK-7608][metric] Refactor latency statistics metric

2017-11-28 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4826
  
So here's the thing: The port of the metric itself is good, and exactly 
what i want 👍 . What I'm unsatisfied with is the naming of the metric, but 
that isn't the fault of this PR but a limitation in the metric system.

The way the metric is named is inconsistent. The source task is encoded in 
the metric name with it's ID, but the target task is encoded in the scope as 
either ID or name, based on the scope format. (maybe reverse the order but you 
get the idea i hope).

The conclusion i arrived at is that this metric should not be registered at 
the task level (which just doesn't support the notion of a metric describing 2 
tasks) but at the job level instead (where we can do whatever we want in 
regards to tasks).

This would, in principal, allow us to have this identifier:
```myjob.latency.source.ABCDE.target.DEFGH.latency_p95```
But this still isn't satisfactory, because it is still hardly usable for 
key-value reporters, where we want to filter latencies based on the source or 
target, for which we need something like this:
```
logical scope: job.task.latency.latency_p95:
tags:
   job_name = myjob
   source = ABCDE
   target = DEFGH
```
For this to work however we first have to implement FLINK-7692 to support 
custom key-value pairs.

As such I would like to delay merging this PR by a week or 2 until the 
aforementioned feature is implemented, so we don't iterate through 3 different 
versions.


---


[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics

2017-11-28 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268863#comment-16268863
 ] 

Chesnay Schepler commented on FLINK-7692:
-

Let me know what you think, and whether you would like to continue working on 
this.

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics

2017-11-28 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268861#comment-16268861
 ] 

Chesnay Schepler commented on FLINK-7692:
-

[~tonywei] Those are good points, luckily we can resolve them in a rather nice 
way. We have to make one change to the original specification though: Create 2 
groups for key-value pairs, instead of one.

For a KeyValue group, the {{name}} on it's own behaves the same way as a 
generic group with the same name; it is part of the identifier and logical 
scope. Only the value is treated differently, as it isn't part of the logical 
scope.

I've create a scaffold implementation (that probably doesn't fully compile... ) 
here: https://github.com/zentol/flink/tree/7692
This still needs tests, documentation, integration into reporters and we have 
to go through all usages of {{addGroup}} to find cases we can now model better.

We need a new class for representing the key group, that behaves pretty much 
like a generic metric group:
{code}
public class GenericKeyMetricGroup extends GenericMetricGroup {

GenericKeyMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name) {
super(registry, parent, name);
}

@Override
protected GenericMetricGroup createChildGroup(String name) {
return new GenericValueMetricGroup(registry, this, name);
}
}
{code}

and a another class for representing the value, that a) modifies the variables 
and b) is excluded from the logical scope
{code}
public class GenericValueMetricGroup extends GenericMetricGroup {
private String key;
private final String value;

GenericValueMetricGroup(MetricRegistry registry, GenericKeyMetricGroup 
parent, String value) {
super(registry, parent, value);
this.key = parent.getGroupName(name -> name);
this.value = value;
}

// 


@Override
protected void putVariables(Map variables) {
variables.put(ScopeFormat.asVariable(this.key), value);
}

@Override
public String getLogicalScope(CharacterFilter filter, char delimiter) {
return parent.getLogicalScope(filter, delimiter);
}
}
{code}

This works with (surprisingly) little changes to existing classes; just 2 hooks 
to add more variables and to create a different kind of generic child group.

With this, the implementation of {{addGroup(key, value)}} is trivial:
{code}
public MetricGroup addGroup(String key, String value) {
return addGroup(key).addGroup(value);
}
{code}

Now, as to the issues you raised:.

When calling {{addGroup(name, value)}},
* if a group {{name}} already exists, we just continue with whatever group was 
there. If the existing group is a key group we get the behavior we want, 
otherwise we will add another {{GenericMetricGroup}}. This may result in the 
value not being exposed as a proper key-value pair, but it doesn't lead to loss 
of information (which we would have if we skipped registering the value). In 
any case the metric identifier is the same.
* if a group {{value}} already exists we don't have a problem. Given that the 
key group is never exposed to the outside this case can only occur upon 
subsequent calls of {{addGroup(name, value)}}, making the latter calls a no-op, 
as is the existing behavior.

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268841#comment-16268841
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r153511839
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
+   
+   
+   analyze
+   
+   
analyze-only
+   
+   
+   
+   
true
+   
true
+   
true
+   

--- End diff --

I would keep logging, junit, jsr305 and force-shading in the parent pom as 
they are used by virtually all modules and would just clutter up other poms.

But I wouldn't mind moving the mocking/hamcrest dependencies to child poms.


> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-11-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r153511839
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
+   
+   
+   analyze
+   
+   
analyze-only
+   
+   
+   
+   
true
+   
true
+   
true
+   

--- End diff --

I would keep logging, junit, jsr305 and force-shading in the parent pom as 
they are used by virtually all modules and would just clutter up other poms.

But I wouldn't mind moving the mocking/hamcrest dependencies to child poms.


---


[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268805#comment-16268805
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r153503257
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
+   
+   
+   analyze
+   
+   
analyze-only
+   
+   
+   
+   
true
+   
true
+   
true
+   

--- End diff --

Would it be better to move the dependencies out of the parent pom?


> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268804#comment-16268804
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r153500167
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
--- End diff --

Redundant from the `` section.


> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-11-28 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r153503257
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
+   
+   
+   analyze
+   
+   
analyze-only
+   
+   
+   
+   
true
+   
true
+   
true
+   

--- End diff --

Would it be better to move the dependencies out of the parent pom?


---


[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-11-28 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5076#discussion_r153500167
  
--- Diff: pom.xml ---
@@ -891,6 +905,41 @@ under the License.
 


+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   3.0.2
--- End diff --

Redundant from the `` section.


---


[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268760#comment-16268760
 ] 

ASF GitHub Bot commented on FLINK-7652:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4734
  
Hi @tillrohrmann, this PR is now rebased to the latest master, and reworked 
to incorporate your last comments.


> Port CurrentJobIdsHandler to new REST endpoint
> --
>
> Key: FLINK-7652
> URL: https://issues.apache.org/jira/browse/FLINK-7652
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port existing {{CurrentJobIdsHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

2017-11-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4734
  
Hi @tillrohrmann, this PR is now rebased to the latest master, and reworked 
to incorporate your last comments.


---


[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268738#comment-16268738
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153495140
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream 
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps 
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link 
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+   private static Logger LOG = 
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+   private final CheckpointCache cache;
+   private final CheckpointStreamFactory remoteFactory;
+
+   public CachedCheckpointStreamFactory(CheckpointCache cache, 
CheckpointStreamFactory factory) {
+   this.cache = cache;
+   this.remoteFactory = Preconditions.checkNotNull(factory, 
"Remote stream factory is null.");
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID) throws Exception {
+   return createCheckpointStateOutputStream(checkpointID, 
timestamp, handleID, false);
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID, boolean placeholder) throws Exception {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("create cache output stream: cpkID:{} 
placeHolder:{}", checkpointID, placeholder);
+   }
+   CachedOutputStream cachedOut = null;
+   if (cache != null) {
+   cachedOut = cache.createOutputStream(checkpointID, 
handleID, placeholder);
+   }
+   CheckpointStateOutputStream remoteOut = null;
+   if (!placeholder) {
+   remoteOut = 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+   CachedCheckpointStateOutputStream output = new 
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+   return output;
+   }
+
+   @Override
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+   LOG.warn("create output stream which is not cacheable.");
+   return 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+
+   @Override
+   public void close() throws Exception {
+   remoteFactory.close();
+   }
+
+   /**
+* A hybrid checkpoint output stream which write data to both remote 
end and local end,
+* writing data locally failed won't stop writing to remote. This 
hybrid output stream
+* will return a {@link CachedStreamStateHandle} in 
closeAndGetHandle(), it can be used for read data locally.
+*/
+   public 

[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153495140
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream 
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps 
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link 
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+   private static Logger LOG = 
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+   private final CheckpointCache cache;
+   private final CheckpointStreamFactory remoteFactory;
+
+   public CachedCheckpointStreamFactory(CheckpointCache cache, 
CheckpointStreamFactory factory) {
+   this.cache = cache;
+   this.remoteFactory = Preconditions.checkNotNull(factory, 
"Remote stream factory is null.");
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID) throws Exception {
+   return createCheckpointStateOutputStream(checkpointID, 
timestamp, handleID, false);
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID, boolean placeholder) throws Exception {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("create cache output stream: cpkID:{} 
placeHolder:{}", checkpointID, placeholder);
+   }
+   CachedOutputStream cachedOut = null;
+   if (cache != null) {
+   cachedOut = cache.createOutputStream(checkpointID, 
handleID, placeholder);
+   }
+   CheckpointStateOutputStream remoteOut = null;
+   if (!placeholder) {
+   remoteOut = 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+   CachedCheckpointStateOutputStream output = new 
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+   return output;
+   }
+
+   @Override
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+   LOG.warn("create output stream which is not cacheable.");
+   return 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+
+   @Override
+   public void close() throws Exception {
+   remoteFactory.close();
+   }
+
+   /**
+* A hybrid checkpoint output stream which write data to both remote 
end and local end,
+* writing data locally failed won't stop writing to remote. This 
hybrid output stream
+* will return a {@link CachedStreamStateHandle} in 
closeAndGetHandle(), it can be used for read data locally.
+*/
+   public static class CachedCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private CachedOutputStream cacheOut = null;
+   private CheckpointStateOutputStream remoteOut = null;
+
+ 

[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268733#comment-16268733
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153493925
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream 
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps 
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link 
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+   private static Logger LOG = 
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+   private final CheckpointCache cache;
+   private final CheckpointStreamFactory remoteFactory;
+
+   public CachedCheckpointStreamFactory(CheckpointCache cache, 
CheckpointStreamFactory factory) {
+   this.cache = cache;
+   this.remoteFactory = Preconditions.checkNotNull(factory, 
"Remote stream factory is null.");
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID) throws Exception {
+   return createCheckpointStateOutputStream(checkpointID, 
timestamp, handleID, false);
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID, boolean placeholder) throws Exception {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("create cache output stream: cpkID:{} 
placeHolder:{}", checkpointID, placeholder);
+   }
+   CachedOutputStream cachedOut = null;
+   if (cache != null) {
+   cachedOut = cache.createOutputStream(checkpointID, 
handleID, placeholder);
+   }
+   CheckpointStateOutputStream remoteOut = null;
+   if (!placeholder) {
+   remoteOut = 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+   CachedCheckpointStateOutputStream output = new 
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+   return output;
+   }
+
+   @Override
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+   LOG.warn("create output stream which is not cacheable.");
+   return 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+
+   @Override
+   public void close() throws Exception {
+   remoteFactory.close();
+   }
+
+   /**
+* A hybrid checkpoint output stream which write data to both remote 
end and local end,
+* writing data locally failed won't stop writing to remote. This 
hybrid output stream
+* will return a {@link CachedStreamStateHandle} in 
closeAndGetHandle(), it can be used for read data locally.
+*/
+   public 

[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153493925
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream 
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps 
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link 
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+   private static Logger LOG = 
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+   private final CheckpointCache cache;
+   private final CheckpointStreamFactory remoteFactory;
+
+   public CachedCheckpointStreamFactory(CheckpointCache cache, 
CheckpointStreamFactory factory) {
+   this.cache = cache;
+   this.remoteFactory = Preconditions.checkNotNull(factory, 
"Remote stream factory is null.");
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID) throws Exception {
+   return createCheckpointStateOutputStream(checkpointID, 
timestamp, handleID, false);
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID, boolean placeholder) throws Exception {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("create cache output stream: cpkID:{} 
placeHolder:{}", checkpointID, placeholder);
+   }
+   CachedOutputStream cachedOut = null;
+   if (cache != null) {
+   cachedOut = cache.createOutputStream(checkpointID, 
handleID, placeholder);
+   }
+   CheckpointStateOutputStream remoteOut = null;
+   if (!placeholder) {
+   remoteOut = 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+   CachedCheckpointStateOutputStream output = new 
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+   return output;
+   }
+
+   @Override
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+   LOG.warn("create output stream which is not cacheable.");
+   return 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+
+   @Override
+   public void close() throws Exception {
+   remoteFactory.close();
+   }
+
+   /**
+* A hybrid checkpoint output stream which write data to both remote 
end and local end,
+* writing data locally failed won't stop writing to remote. This 
hybrid output stream
+* will return a {@link CachedStreamStateHandle} in 
closeAndGetHandle(), it can be used for read data locally.
+*/
+   public static class CachedCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private CachedOutputStream cacheOut = null;
+   private CheckpointStateOutputStream remoteOut = null;
+
+ 

[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268730#comment-16268730
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153493396
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration(
 
final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
+   final CheckpointCacheManager checkpointCacheManager = new 
CheckpointCacheManager(
+   new ScheduledThreadPoolExecutor(1),
+   Executors.directExecutor(),
+   taskManagerServicesConfiguration.getTmpDirPaths()[0]);
--- End diff --

You're right. I even thought we should add a configuration for local 
recovery to store the data.


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153493396
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration(
 
final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
+   final CheckpointCacheManager checkpointCacheManager = new 
CheckpointCacheManager(
+   new ScheduledThreadPoolExecutor(1),
+   Executors.directExecutor(),
+   taskManagerServicesConfiguration.getTmpDirPaths()[0]);
--- End diff --

You're right. I even thought we should add a configuration for local 
recovery to store the data.


---


[GitHub] flink issue #4666: [FLINK-7613][Documentation] Fixed typographical error

2017-11-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4666
  
@raymondtay are we still looking to make this change? Defining the mapper 
may be just as likely to confuse new Flink users.

Also, when updating future PRs you want to rebase to master rather than 
merging master.


---


[jira] [Commented] (FLINK-7613) Fix documentation error in QuickStart

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268728#comment-16268728
 ] 

ASF GitHub Bot commented on FLINK-7613:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4666
  
@raymondtay are we still looking to make this change? Defining the mapper 
may be just as likely to confuse new Flink users.

Also, when updating future PRs you want to rebase to master rather than 
merging master.


> Fix documentation error in QuickStart
> -
>
> Key: FLINK-7613
> URL: https://issues.apache.org/jira/browse/FLINK-7613
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Raymond Tay
>Priority: Minor
>
> In the `QuickStart => Run The Example` section, there's a typographical error 
> which points the reader to `*jobmanager* but it should be `*taskmanager*` in 
> Apache Flink 1.4.x. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268727#comment-16268727
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153492995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+   } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --

Yes, this can be avoid by map local state to checkpoint ids. In fact, it 
can also be avoid when map local state to handle id, but it needs some 
additional code. 


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153492995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+   } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --

Yes, this can be avoid by map local state to checkpoint ids. In fact, it 
can also be avoid when map local state to handle id, but it needs some 
additional code. 


---


[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268724#comment-16268724
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153492082
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+   } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --

@StefanRRichter Thanks very much for reviewing my code and Tanks very much 
for your so detailed expression of your opinion, very happy can be similar to 
what you think in some places, there are two things I want to explain a bit:

1. About the 1:1 relationship between remote handle and local handle, In 
fact, I think each local state handle corresponds to a smallest storage unit of 
a checkpoint. For example, each Backend will generates a 
`IncrementalKeyedStateHandle` for every increment checkpoint, but 
`IncrementalKeyedStateHandle` is a composite handle, it contains a collect of 
sub StateHandle to stores data (meta & sst & misc), in this case the sub 
StateHanlde is the smallest storage unit and each of them have 1:1 relationship 
with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship 
with local state handle(Now, CheckpointStateOutputStream.closeAndGet () returns 
a remote handle, which I viewed as the smallest storage unit). For incremental 
checkpoint, it can be optimized indeed, we can provide a green path for it to 
put cache entry into checkpoint cache, it doesn't need to write data locally 
when Transmitting data to remote end. I didn't do that because I wanted to 
provide a unified way to meet up all Backends requirements and I didn't want to 
change the code of Backend so much.

2. The local handle can be not only a local file, it can also be stored in 
memory, or other storage medium, or even just a mock (it may apply to 
CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit 
CachedStateHandle and implement corresponding classes.

IMO map local state to checkpoint id can also work, but I have ome minor 
questions about that:
1. Can we provide a unified local state way to meet all of the current 
state backend requirements (of course, the RocksDB can be optimized)?  
2. Since the local state is mapped according to the checkpoint id, the key 
range detection needs to be performed locally again, which is a bit repetitive, 
can this be avoided with the work on JM.


Although I've expressed my ideas, but I think you are more professional 
than me in this area and your thought should be better than mine. So if you 
have any planned issues, I would like to close this PR and turn to work on your 
planned issues, it seems that even thought this PR has some ideas which are 
similar to yours, but it seem not the base version you expected. But currently, 
we will still use this version of local checkpoint (it still need addressed 
some problem as your comments) for production, because the flink 1.4 does not 
have this feature and we need it very much (Our state size is very huge), With 
1.5 release, we will switch and use the community version.


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, 

[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153492082
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+   } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --

@StefanRRichter Thanks very much for reviewing my code and Tanks very much 
for your so detailed expression of your opinion, very happy can be similar to 
what you think in some places, there are two things I want to explain a bit:

1. About the 1:1 relationship between remote handle and local handle, In 
fact, I think each local state handle corresponds to a smallest storage unit of 
a checkpoint. For example, each Backend will generates a 
`IncrementalKeyedStateHandle` for every increment checkpoint, but 
`IncrementalKeyedStateHandle` is a composite handle, it contains a collect of 
sub StateHandle to stores data (meta & sst & misc), in this case the sub 
StateHanlde is the smallest storage unit and each of them have 1:1 relationship 
with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship 
with local state handle(Now, CheckpointStateOutputStream.closeAndGet () 
returns a remote handle, which I viewed as the smallest storage unit). For 
incremental checkpoint, it can be optimized indeed, we can provide a green path 
for it to put cache entry into checkpoint cache, it doesn't need to write data 
locally when Transmitting data to remote end. I didn't do that because I wanted 
to provide a unified
  way to meet up all Backends requirements and I didn't want to change the code 
of Backend so much.

2. The local handle can be not only a local file, it can also be stored in 
memory, or other storage medium, or even just a mock (it may apply to 
CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit 
CachedStateHandle and implement corresponding classes.

IMO map local state to checkpoint id can also work, but I have ome minor 
questions about that:
1. Can we provide a unified local state way to meet all of the current 
state backend requirements (of course, the RocksDB can be optimized)?  
2. Since the local state is mapped according to the checkpoint id, the key 
range detection needs to be performed locally again, which is a bit repetitive, 
can this be avoided with the work on JM.


Although I've expressed my ideas, but I think you are more professional 
than me in this area and your thought should be better than mine. So if you 
have any planned issues, I would like to close this PR and turn to work on your 
planned issues, it seems that even thought this PR has some ideas which are 
similar to yours, but it seem not the base version you expected. But currently, 
we will still use this version of local checkpoint (it still need addressed 
some problem as your comments) for production, because the flink 1.4 does not 
have this feature and we need it very much (Our state size is very huge), With 
1.5 release, we will switch and use the community version.


---


[jira] [Assigned] (FLINK-7805) Add HA capabilities to YarnResourceManager

2017-11-28 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-7805:
---

Assignee: Gary Yao

> Add HA capabilities to YarnResourceManager
> --
>
> Key: FLINK-7805
> URL: https://issues.apache.org/jira/browse/FLINK-7805
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
>
> The new {{YarnResourceManager}} implementation does not retrieve allocated 
> containers from previous attempts in HA mode like the old 
> {{YarnFlinkResourceManager}} did. We should add this functionality in order 
> to properly support long running Yarn applications [1].
> [1] 
> https://de.hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268686#comment-16268686
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153483875
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
 ---
@@ -29,16 +30,16 @@
 /**
  * Json serializer for {@link InstanceID}.
  */
-public class InstanceIDSerializer extends StdSerializer {
+public class ResourceIDSerializer extends StdSerializer {
 
private static final long serialVersionUID = 5798852092159615938L;
 
-   protected InstanceIDSerializer() {
-   super(InstanceID.class);
+   protected ResourceIDSerializer() {
+   super(ResourceID.class);
}
 
@Override
-   public void serialize(InstanceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+   public void serialize(ResourceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
gen.writeString(value.toString());
--- End diff --

Maybe `value.getResourceIdString()`.


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153483875
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
 ---
@@ -29,16 +30,16 @@
 /**
  * Json serializer for {@link InstanceID}.
  */
-public class InstanceIDSerializer extends StdSerializer {
+public class ResourceIDSerializer extends StdSerializer {
 
private static final long serialVersionUID = 5798852092159615938L;
 
-   protected InstanceIDSerializer() {
-   super(InstanceID.class);
+   protected ResourceIDSerializer() {
+   super(ResourceID.class);
}
 
@Override
-   public void serialize(InstanceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+   public void serialize(ResourceID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
gen.writeString(value.toString());
--- End diff --

Maybe `value.getResourceIdString()`.


---


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268681#comment-16268681
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153483098
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
 ---
@@ -53,8 +53,8 @@
public static final String FIELD_NAME_HARDWARE = "hardware";
 
@JsonProperty(FIELD_NAME_INSTANCE_ID)
--- End diff --

Should be `FIELD_NAME_RESOURCE_ID`


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153483098
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
 ---
@@ -53,8 +53,8 @@
public static final String FIELD_NAME_HARDWARE = "hardware";
 
@JsonProperty(FIELD_NAME_INSTANCE_ID)
--- End diff --

Should be `FIELD_NAME_RESOURCE_ID`


---


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153482920
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
 ---
@@ -43,7 +43,7 @@
 
@JsonCreator
public TaskManagerDetailsInfo(
-   @JsonDeserialize(using = InstanceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+   @JsonDeserialize(using = ResourceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
--- End diff --

This should be named `FIELD_NAME_RESOURCE_ID`.


---


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268680#comment-16268680
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5093#discussion_r153482920
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
 ---
@@ -43,7 +43,7 @@
 
@JsonCreator
public TaskManagerDetailsInfo(
-   @JsonDeserialize(using = InstanceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+   @JsonDeserialize(using = ResourceIDDeserializer.class) 
@JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
--- End diff --

This should be named `FIELD_NAME_RESOURCE_ID`.


> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4504: [FLINK-7395] [metrics] Count bytesIn/Out without synchron...

2017-11-28 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4504
  
@NicoK The byte counting in the ResultPartition class looks unrelated to 
the metric system; in fact that field isn't read anywhere. We may want to 
remove it, but I wouldn't do that as part of this PR.


---


[jira] [Commented] (FLINK-7395) NumBytesOut metric in RecordWriter call synchronized method

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268656#comment-16268656
 ] 

ASF GitHub Bot commented on FLINK-7395:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4504
  
@NicoK The byte counting in the ResultPartition class looks unrelated to 
the metric system; in fact that field isn't read anywhere. We may want to 
remove it, but I wouldn't do that as part of this PR.


> NumBytesOut metric in RecordWriter call synchronized method
> ---
>
> Key: FLINK-7395
> URL: https://issues.apache.org/jira/browse/FLINK-7395
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.3.3
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268653#comment-16268653
 ] 

ASF GitHub Bot commented on FLINK-8158:
---

GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5094

[FLINK-8158] [table] Fix rowtime window inner join emits late data bug


## What is the purpose of the change

This pull request fixes rowtime window inner join emits late data bug. When 
executing the join, the join operator needs to make sure that no late data is 
emitted. However, the window border is not handled correctly.


## Brief change log

  - Set `WatermarkDelay` to `MaxOutputDelay + 1` instead of `MaxOutputDelay`
  - Add tests in `JoinHarnessTest`


## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests in `JoinHarnessTest` to check if late data is outputted*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8158

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5094.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5094


commit 37d08da40150d26b0c58daf1bc85b69675bf5d64
Author: 军长 
Date:   2017-11-28T10:58:52Z

[FLINK-8158] [table] Fix Rowtime window inner join emits late data bug




> Rowtime window inner join emits late data
> -
>
> Key: FLINK-8158
> URL: https://issues.apache.org/jira/browse/FLINK-8158
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
> Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
> val sqlQuery =
>   """
> SELECT t2.key, t2.id, t1.id
> FROM T1 as t1 join T2 as t2 ON
>   t1.key = t2.key AND
>   t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
> t2.rt + INTERVAL '1' SECOND
> """.stripMargin
> val data1 = new mutable.MutableList[(String, String, Long)]
> // for boundary test
> data1.+=(("A", "LEFT1", 6000L))
> val data2 = new mutable.MutableList[(String, String, Long)]
> data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5094: [FLINK-8158] [table] Fix rowtime window inner join...

2017-11-28 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5094

[FLINK-8158] [table] Fix rowtime window inner join emits late data bug


## What is the purpose of the change

This pull request fixes rowtime window inner join emits late data bug. When 
executing the join, the join operator needs to make sure that no late data is 
emitted. However, the window border is not handled correctly.


## Brief change log

  - Set `WatermarkDelay` to `MaxOutputDelay + 1` instead of `MaxOutputDelay`
  - Add tests in `JoinHarnessTest`


## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests in `JoinHarnessTest` to check if late data is outputted*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8158

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5094.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5094


commit 37d08da40150d26b0c58daf1bc85b69675bf5d64
Author: 军长 
Date:   2017-11-28T10:58:52Z

[FLINK-8158] [table] Fix Rowtime window inner join emits late data bug




---


[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268623#comment-16268623
 ] 

ASF GitHub Bot commented on FLINK-8150:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5093

[FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id

## What is the purpose of the change

Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This 
was wrong
since the InstanceID is bound the registration of a TaskExecutor whereas the
ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better 
to identify
the TaskExecutor by its ResourceID which does not change.

This commit changes the behaviour accordingly on the ResourceManager and the
TaskManagerDetailsHandler.

## Brief change log


## Verifying this change

- Added `ResourceManagerTest#testRequestTaskManagerInfo` to ensure that we 
can obtain the `TaskManagerInfo` for registered `TaskExecutors`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixTaskManagerId

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5093.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5093


commit 415e3da9b6bc8dd6071ca98966779f73b682b658
Author: Till Rohrmann 
Date:   2017-11-28T11:43:39Z

[FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id

Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This 
was wrong
since the InstanceID is bound the registration of a TaskExecutor whereas the
ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better 
to identify
the TaskExecutor by its ResourceID which does not change.

This commit changes the behaviour accordingly on the ResourceManager and the
TaskManagerDetailsHandler.




> WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
> -
>
> Key: FLINK-8150
> URL: https://issues.apache.org/jira/browse/FLINK-8150
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> TaskManager IDs exposed by 
> {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
> cannot be used as input to query TaskManager metrics with method 
> {{MetricStore#getTaskManagerMetricStore(String)}}.
> *Reason*
> {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} 
> s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} 
> s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
> returns the Taskmanager resource IDs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...

2017-11-28 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5093

[FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id

## What is the purpose of the change

Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This 
was wrong
since the InstanceID is bound the registration of a TaskExecutor whereas the
ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better 
to identify
the TaskExecutor by its ResourceID which does not change.

This commit changes the behaviour accordingly on the ResourceManager and the
TaskManagerDetailsHandler.

## Brief change log


## Verifying this change

- Added `ResourceManagerTest#testRequestTaskManagerInfo` to ensure that we 
can obtain the `TaskManagerInfo` for registered `TaskExecutors`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixTaskManagerId

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5093.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5093


commit 415e3da9b6bc8dd6071ca98966779f73b682b658
Author: Till Rohrmann 
Date:   2017-11-28T11:43:39Z

[FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id

Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This 
was wrong
since the InstanceID is bound the registration of a TaskExecutor whereas the
ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better 
to identify
the TaskExecutor by its ResourceID which does not change.

This commit changes the behaviour accordingly on the ResourceManager and the
TaskManagerDetailsHandler.




---


  1   2   >