[jira] [Commented] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319856#comment-16319856
 ] 

ASF GitHub Bot commented on FLINK-8399:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5271

[FLINK-8399] [runtime] use independent configurations for the different 
timeouts in slot manager


## What is the purpose of the change

*This pull request separate the timeouts for slot request to task manager, 
slot request to be discarded and task manager to be released in slot manager to 
three different configurations.*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8399

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5271


commit f7024439ead5e3848c705659bfe221b8ce50f154
Author: shuai.xus 
Date:   2018-01-10T07:43:20Z

[FLINK-8399] [runtime] use independent configurations for the different 
timeouts in slot manager




> Use independent configurations for the different timeouts in slot manager
> -
>
> Key: FLINK-8399
> URL: https://issues.apache.org/jira/browse/FLINK-8399
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> There are three parameter in slot manager to indicate the timeout for slot 
> request to task manager, slot request to be discarded and task manager to be 
> released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, 
> need to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5271: [FLINK-8399] [runtime] use independent configurati...

2018-01-09 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5271

[FLINK-8399] [runtime] use independent configurations for the different 
timeouts in slot manager


## What is the purpose of the change

*This pull request separate the timeouts for slot request to task manager, 
slot request to be discarded and task manager to be released in slot manager to 
three different configurations.*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8399

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5271


commit f7024439ead5e3848c705659bfe221b8ce50f154
Author: shuai.xus 
Date:   2018-01-10T07:43:20Z

[FLINK-8399] [runtime] use independent configurations for the different 
timeouts in slot manager




---


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319846#comment-16319846
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607315
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
 ---
@@ -27,6 +28,7 @@
  * @deprecated Use {@link FlinkKafkaConsumer08}
  */
 @Deprecated
+@PublicEvolving
--- End diff --

I don't think this annotation is needed here, since the class is deprecated 
already.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319852#comment-16319852
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607865
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 ---
@@ -17,12 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.Serializable;
 
 /**
  * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
  * across partitions of multiple Kafka topics.
  */
+@Internal
--- End diff --

`FlinkKafkaPartitioner` should be `@PublicEvolving`.
  


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319851#comment-16319851
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607372
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 ---
@@ -31,6 +32,7 @@
  * @deprecated Use {@link FlinkKafkaProducer08}.
  */
 @Deprecated
+@PublicEvolving
--- End diff --

Same here.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319847#comment-16319847
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608176
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 ---
@@ -37,6 +38,7 @@
  * Result byte[] messages can be deserialized using
  * {@link JsonRowDeserializationSchema}.
  */
+@Internal
--- End diff --

Same here.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319854#comment-16319854
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608074
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 ---
@@ -37,6 +38,7 @@
  *
  * Failure during deserialization are forwarded as wrapped IOExceptions.
  */
+@Internal
--- End diff --

This could also be `@PublicEvolving`, I don't think there's a problem with 
the user using it directly.




> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319850#comment-16319850
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608063
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 ---
@@ -37,6 +38,7 @@
  * Metadata fields can be accessed by calling 
objectNode.get("metadata").get(name>).as(type>) and include
  * the "offset" (long), "topic" (String) and "partition" (int).
  */
+@Internal
--- End diff --

This could also be `@PublicEvolving`, I don't think there's a problem with 
the user using it directly.




> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319853#comment-16319853
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608223
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ---
@@ -29,6 +30,7 @@
  *
  * @param  The type created by the keyed deserialization schema.
  */
+@Internal
--- End diff --

`KeyedDeserializationSchema` should be `@PublicEvolving`.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319848#comment-16319848
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608269
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ---
@@ -26,6 +28,7 @@
  *
  * @param  The type to be serialized.
  */
+@Internal
--- End diff --

`KeyedSerializationSchema` should be `@PublicEvolving`.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319844#comment-16319844
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607814
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -50,6 +51,7 @@
  * To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner (note that this will
  * cause a lot of network connections between all the Flink instances and 
all the Kafka brokers).
  */
+@Internal
--- End diff --

Not sure here. This could be `@PublicEvolving`.

It could very well be that the user simply instantiates a 
`FlinkFixedPartitioner` as the provided custom partitioner.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319845#comment-16319845
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608041
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
 ---
@@ -29,6 +30,7 @@
  *
  * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
  */
+@Internal
--- End diff --

This could also be `@PublicEvolving`, I don't think there's a problem with 
the user using it directly.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8276) Annotation for Kafka connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319849#comment-16319849
 ] 

ASF GitHub Bot commented on FLINK-8276:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607359
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
 ---
@@ -27,6 +28,7 @@
  * @deprecated Use {@link FlinkKafkaConsumer08}
  */
 @Deprecated
+@PublicEvolving
--- End diff --

Same here.


> Annotation for Kafka connector
> --
>
> Key: FLINK-8276
> URL: https://issues.apache.org/jira/browse/FLINK-8276
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.5.0
>
>
> See parent issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608176
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 ---
@@ -37,6 +38,7 @@
  * Result byte[] messages can be deserialized using
  * {@link JsonRowDeserializationSchema}.
  */
+@Internal
--- End diff --

Same here.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607315
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
 ---
@@ -27,6 +28,7 @@
  * @deprecated Use {@link FlinkKafkaConsumer08}
  */
 @Deprecated
+@PublicEvolving
--- End diff --

I don't think this annotation is needed here, since the class is deprecated 
already.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608074
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 ---
@@ -37,6 +38,7 @@
  *
  * Failure during deserialization are forwarded as wrapped IOExceptions.
  */
+@Internal
--- End diff --

This could also be `@PublicEvolving`, I don't think there's a problem with 
the user using it directly.




---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608041
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
 ---
@@ -29,6 +30,7 @@
  *
  * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
  */
+@Internal
--- End diff --

This could also be `@PublicEvolving`, I don't think there's a problem with 
the user using it directly.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607865
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 ---
@@ -17,12 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.Serializable;
 
 /**
  * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
  * across partitions of multiple Kafka topics.
  */
+@Internal
--- End diff --

`FlinkKafkaPartitioner` should be `@PublicEvolving`.
  


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607814
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -50,6 +51,7 @@
  * To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner (note that this will
  * cause a lot of network connections between all the Flink instances and 
all the Kafka brokers).
  */
+@Internal
--- End diff --

Not sure here. This could be `@PublicEvolving`.

It could very well be that the user simply instantiates a 
`FlinkFixedPartitioner` as the provided custom partitioner.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607372
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 ---
@@ -31,6 +32,7 @@
  * @deprecated Use {@link FlinkKafkaProducer08}.
  */
 @Deprecated
+@PublicEvolving
--- End diff --

Same here.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608269
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ---
@@ -26,6 +28,7 @@
  *
  * @param  The type to be serialized.
  */
+@Internal
--- End diff --

`KeyedSerializationSchema` should be `@PublicEvolving`.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160607359
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
 ---
@@ -27,6 +28,7 @@
  * @deprecated Use {@link FlinkKafkaConsumer08}
  */
 @Deprecated
+@PublicEvolving
--- End diff --

Same here.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608223
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ---
@@ -29,6 +30,7 @@
  *
  * @param  The type created by the keyed deserialization schema.
  */
+@Internal
--- End diff --

`KeyedDeserializationSchema` should be `@PublicEvolving`.


---


[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5173#discussion_r160608063
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 ---
@@ -37,6 +38,7 @@
  * Metadata fields can be accessed by calling 
objectNode.get("metadata").get(name>).as(type>) and include
  * the "offset" (long), "topic" (String) and "partition" (int).
  */
+@Internal
--- End diff --

This could also be `@PublicEvolving`, I don't think there's a problem with 
the user using it directly.




---


[jira] [Commented] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319834#comment-16319834
 ] 

ASF GitHub Bot commented on FLINK-8324:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5214
  
Good idea. I think we should also update the metrics doc to educate this 
addition.


> Expose another offsets metrics by using new metric API to specify user 
> defined variables
> 
>
> Key: FLINK-8324
> URL: https://issues.apache.org/jira/browse/FLINK-8324
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
> Fix For: 1.5.0
>
>
> The {{current-offsets}} and {{committed-offsets}} metrics are now attached 
> with topic name and partition id in the metric identity.
> It is not convenient to use these metrics in Prometheus, because user usually 
> uses the same metric group name to group by those metrics which have the same 
> meaning and uses tags to get the individual metric.
> For example, I will prefer to access {{current-offsets}} metric group and use 
> {{partition-x}} tag to get the offset of partition x, instead of getting 
> metric directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5214: [FLINK-8324] [kafka] Expose another offsets metrics by us...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5214
  
Good idea. I think we should also update the metrics doc to educate this 
addition.


---


[jira] [Created] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8399:
---

 Summary: Use independent configurations for the different timeouts 
in slot manager
 Key: FLINK-8399
 URL: https://issues.apache.org/jira/browse/FLINK-8399
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


There are three parameter in slot manager to indicate the timeout for slot 
request to task manager, slot request to be discarded and task manager to be 
released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need 
to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-09 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8399:

Description: There are three parameter in slot manager to indicate the 
timeout for slot request to task manager, slot request to be discarded and task 
manager to be released. But now they all come from the value of 
AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them.  
(was: There are three parameter in slot manager to indicate the timeout for 
slot request to task manager, slot request to be discarded and task manager to 
be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, 
need to use independent configurations for them.)

> Use independent configurations for the different timeouts in slot manager
> -
>
> Key: FLINK-8399
> URL: https://issues.apache.org/jira/browse/FLINK-8399
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> There are three parameter in slot manager to indicate the timeout for slot 
> request to task manager, slot request to be discarded and task manager to be 
> released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, 
> need to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
Regarding the race condition you mentioned:
hmm, I can't seem to exactly nail down the "and only first 
`successfulCommits.inc()` can be omitted because of that" case you mentioned, 
could you elaborate on that a bit more?

But yes, it seems like there certainly is a race condition here, and can 
even cause an NPE on:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L496

Seems like the `nextOffsetsToCommit` and its callback instance should be 
bundled as a single `AtomicReference` here ...

Would you like to open a PR for that? I wanted to ask because you 
discovered it in the first place; if not I'll open a fix for it.


---


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319816#comment-16319816
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
Regarding the race condition you mentioned:
hmm, I can't seem to exactly nail down the "and only first 
`successfulCommits.inc()` can be omitted because of that" case you mentioned, 
could you elaborate on that a bit more?

But yes, it seems like there certainly is a race condition here, and can 
even cause an NPE on:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L496

Seems like the `nextOffsetsToCommit` and its callback instance should be 
bundled as a single `AtomicReference` here ...

Would you like to open a PR for that? I wanted to ask because you 
discovered it in the first place; if not I'll open a fix for it.


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319780#comment-16319780
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
@pnowojski thanks a lot for your insightful review!

regarding the choice of composition or inheritance: actually, in the end I 
think we should be leaning towards neither of both, and let offset committing 
and record fetching live as separate components. I've left more detailed 
comments inline.


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8368) Port SubtaskExecutionAttemptDetailsHandler to new REST endpoint

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319778#comment-16319778
 ] 

ASF GitHub Bot commented on FLINK-8368:
---

GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5270

[FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a 
REST handler

## What is the purpose of the change

* Migrate 
`org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler`
 to flip-6 `WebMonitorEndpoint`.

## Brief change log

* Make some abstraction about `JobVertexHandler` and 
`SubtaskAttemptHandler`.
* Add `SubtaskExecutionAttemptDetailsHandler` in flip-6 REST framework.
* Rename inner class `JobVertexMetrics` to public class `IOMetricsInfo`, 
make it more reusable.

## Verifying this change

* This change added unit tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-8368

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5270.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5270


commit 29182f04255f77288e207aef9e7015862d3e9a8c
Author: biao.liub 
Date:   2018-01-10T06:25:07Z

[FLINK-8368] Migrate 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler
 to new a REST handler that registered in WebMonitorEndpoint




> Port SubtaskExecutionAttemptDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-8368
> URL: https://issues.apache.org/jira/browse/FLINK-8368
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Reporter: Biao Liu
>Assignee: Biao Liu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Migrate 
> org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler
>  to new a REST handler that registered in WebMonitorEndpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
@pnowojski thanks a lot for your insightful review!

regarding the choice of composition or inheritance: actually, in the end I 
think we should be leaning towards neither of both, and let offset committing 
and record fetching live as separate components. I've left more detailed 
comments inline.


---


[GitHub] flink pull request #5270: [FLINK-8368] [REST] Migrate SubtaskExecutionAttemp...

2018-01-09 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/5270

[FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a 
REST handler

## What is the purpose of the change

* Migrate 
`org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler`
 to flip-6 `WebMonitorEndpoint`.

## Brief change log

* Make some abstraction about `JobVertexHandler` and 
`SubtaskAttemptHandler`.
* Add `SubtaskExecutionAttemptDetailsHandler` in flip-6 REST framework.
* Rename inner class `JobVertexMetrics` to public class `IOMetricsInfo`, 
make it more reusable.

## Verifying this change

* This change added unit tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink FLINK-8368

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5270.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5270


commit 29182f04255f77288e207aef9e7015862d3e9a8c
Author: biao.liub 
Date:   2018-01-10T06:25:07Z

[FLINK-8368] Migrate 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler
 to new a REST handler that registered in WebMonitorEndpoint




---


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319766#comment-16319766
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160599433
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

The result could also very well be that we should use this opportunity to 
refactor the vague dependencies between fetcher / consumer thread / consumer 
base, and include in this PR. I would not be against that.


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160599433
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

The result could also very well be that we should use this opportunity to 
refactor the vague dependencies between fetcher / consumer thread / consumer 
base, and include in this PR. I would not be against that.


---


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319763#comment-16319763
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160598939
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -89,30 +90,44 @@
@SuppressWarnings("unchecked")
public void testEitherWatermarkExtractor() {
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
final AssignerWithPeriodicWatermarks periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
final AssignerWithPunctuatedWatermarks 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 
-   DummyFlinkKafkaConsumer c1 =
-   new 
DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false);
+   DummyFlinkKafkaConsumer c1 = new 
DummyFlinkKafkaConsumer<>(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
--- End diff --

Will do.


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160598939
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -89,30 +90,44 @@
@SuppressWarnings("unchecked")
public void testEitherWatermarkExtractor() {
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
final AssignerWithPeriodicWatermarks periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
final AssignerWithPunctuatedWatermarks 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 
-   DummyFlinkKafkaConsumer c1 =
-   new 
DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false);
+   DummyFlinkKafkaConsumer c1 = new 
DummyFlinkKafkaConsumer<>(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
--- End diff --

Will do.


---


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319762#comment-16319762
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r16059
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -559,6 +565,7 @@ public void onException(Throwable cause) {
//the fetchers 'snapshotCurrentState()' method 
return at least
//the restored offsets
this.kafkaFetcher = fetcher;
+   this.kafkaOffsetCommitter = createOffsetCommitter();
--- End diff --

This is a very valid argument. Will address this with a factory perhaps, as 
you suggested.


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319761#comment-16319761
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160598772
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

I agree that composition suits better here, or maybe even neither of both. 
However, the reality is that currently the offset committing logic is 
implemented tightly as part of the `AbstractFetcher`, sharing the same Kafka 
client for both fetching records and committing offsets. Decoupling that would 
require further refactoring, which I think is a bit out of scope for the 
current issue at hand.

I have been thinking that we should simply have two separate service 
implementations for offset committing and record fetching. If that happens, 
then neither composition or inheritance is required; offset committing and 
record fetching simply lives as two separate services.

What do you think?


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r16059
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -559,6 +565,7 @@ public void onException(Throwable cause) {
//the fetchers 'snapshotCurrentState()' method 
return at least
//the restored offsets
this.kafkaFetcher = fetcher;
+   this.kafkaOffsetCommitter = createOffsetCommitter();
--- End diff --

This is a very valid argument. Will address this with a factory perhaps, as 
you suggested.


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160598772
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

I agree that composition suits better here, or maybe even neither of both. 
However, the reality is that currently the offset committing logic is 
implemented tightly as part of the `AbstractFetcher`, sharing the same Kafka 
client for both fetching records and committing offsets. Decoupling that would 
require further refactoring, which I think is a bit out of scope for the 
current issue at hand.

I have been thinking that we should simply have two separate service 
implementations for offset committing and record fetching. If that happens, 
then neither composition or inheritance is required; offset committing and 
record fetching simply lives as two separate services.

What do you think?


---


[jira] [Commented] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319754#comment-16319754
 ] 

ASF GitHub Bot commented on FLINK-6004:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5269

[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip 
non-deserializable records

## What is the purpose of the change

This PR is based on #5268, which includes fixes to harden Kinesis unit 
tests. Only the last commit is relevant.

In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka 
records which cannot be deserialized. In reality pipelines, it is entirely 
normal that this could happen.

This PR adds this functionality to the Flink Kinesis Consumer also.

## Brief change log

- Clarify in Javadoc of `KinesisDeserializationSchema` that `null` can be 
returned if a message cannot be deserialized.
- If `record` is `null` in 
`KinesisDataFetcher::emitRecordAndUpdateState(...)`, do not collect any output 
for the record.
- Add `KinesisDataFetcherTest::testSkipCorruptedRecord()` to verify feature.

## Verifying this change

Additional `KinesisDataFetcherTest::testSkipCorruptedRecord()` test 
verifies this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-6004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5269


commit 94b45919afa5a3ec3ce68c45e57f7989397f9640
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T02:11:31Z

[FLINK-8398] [kinesis, tests] Cleanup confusing implementations in 
KinesisDataFetcherTest and related classes

The previous implementation of the TestableKinesisDataFetcher was
confusing in various ways, causing it hard to be re-used for other
tests. This commit contains the following various cleaups:

- Remove confusing mocks of source context and checkpoint lock. We now
  allow users of the TestableKinesisDataFetcher to provide a source
  context, which should provide the checkpoint lock.
- Remove override of emitRecordAndUpdateState(). Strictly speaking, that
  method should be final. It was previously overriden to allow
  verifying how many records were output by the fetcher. That
  verification would be better implemented within a mock source context.
- Properly parameterize the output type for the
  TestableKinesisDataFetcher.
- Remove use of PowerMockito in KinesisDataFetcherTest.
- Use CheckedThreads to properly capture any exceptions in fetcher /
  consumer threads in unit tests.
- Use assertEquals / assertNull instead of assertTrue where-ever
  appropriate.

commit 547d19f9196512231661f427f3792f2e1f831339
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T05:41:49Z

[FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests

Prior to this commit, several unit tests in KinesisDataFetcherTest
relied on sleeps to wait until a certain operation happens, in order for
the test to pass.

This commit removes those sleeps and replaces the test behaviours with
OneShotLatches.

commit 8d2b086ae7133999ec03620e3434cd659fd8d9d3
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T06:04:10Z

[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records

This commit acknowledges that null can be returned from the
deserialization schema, if the message cannot be deserialized. If null
is returned for a Kinesis record, no output is produced for that record,
while the sequence number in the shard state is still advanced so that
the record is effectively accounted as processed.




> Allow FlinkKinesisConsumer to skip corrupted messages
> 

[GitHub] flink pull request #5269: [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer ...

2018-01-09 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5269

[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip 
non-deserializable records

## What is the purpose of the change

This PR is based on #5268, which includes fixes to harden Kinesis unit 
tests. Only the last commit is relevant.

In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka 
records which cannot be deserialized. In reality pipelines, it is entirely 
normal that this could happen.

This PR adds this functionality to the Flink Kinesis Consumer also.

## Brief change log

- Clarify in Javadoc of `KinesisDeserializationSchema` that `null` can be 
returned if a message cannot be deserialized.
- If `record` is `null` in 
`KinesisDataFetcher::emitRecordAndUpdateState(...)`, do not collect any output 
for the record.
- Add `KinesisDataFetcherTest::testSkipCorruptedRecord()` to verify feature.

## Verifying this change

Additional `KinesisDataFetcherTest::testSkipCorruptedRecord()` test 
verifies this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-6004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5269


commit 94b45919afa5a3ec3ce68c45e57f7989397f9640
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T02:11:31Z

[FLINK-8398] [kinesis, tests] Cleanup confusing implementations in 
KinesisDataFetcherTest and related classes

The previous implementation of the TestableKinesisDataFetcher was
confusing in various ways, causing it hard to be re-used for other
tests. This commit contains the following various cleaups:

- Remove confusing mocks of source context and checkpoint lock. We now
  allow users of the TestableKinesisDataFetcher to provide a source
  context, which should provide the checkpoint lock.
- Remove override of emitRecordAndUpdateState(). Strictly speaking, that
  method should be final. It was previously overriden to allow
  verifying how many records were output by the fetcher. That
  verification would be better implemented within a mock source context.
- Properly parameterize the output type for the
  TestableKinesisDataFetcher.
- Remove use of PowerMockito in KinesisDataFetcherTest.
- Use CheckedThreads to properly capture any exceptions in fetcher /
  consumer threads in unit tests.
- Use assertEquals / assertNull instead of assertTrue where-ever
  appropriate.

commit 547d19f9196512231661f427f3792f2e1f831339
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T05:41:49Z

[FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests

Prior to this commit, several unit tests in KinesisDataFetcherTest
relied on sleeps to wait until a certain operation happens, in order for
the test to pass.

This commit removes those sleeps and replaces the test behaviours with
OneShotLatches.

commit 8d2b086ae7133999ec03620e3434cd659fd8d9d3
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T06:04:10Z

[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records

This commit acknowledges that null can be returned from the
deserialization schema, if the message cannot be deserialized. If null
is returned for a Kinesis record, no output is produced for that record,
while the sequence number in the shard state is still advanced so that
the record is effectively accounted as processed.




---


[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319741#comment-16319741
 ] 

ASF GitHub Bot commented on FLINK-8398:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5268

[FLINK-8398] [kinesis, tests] Harden KinesisDataFetcherTest unit tests

## What is the purpose of the change

Prior to this PR, many of the `KinesisDataFetcherTest` unit tests relied on 
thread sleeps to wait until a certain operation occurs to allow the test to 
pass. This test behaviour is very flaky, and should be replaced with 
`OneShotLatch`.

## Brief change log

- 94b4591: Several minor cleanups of confusing implementations / code 
smells in the `KinesisDataFetcherTest` and related test classes. The commit 
message explains what exactly was changed.
- 547d19f: Remove thread sleeps in unit tests, and replace them with 
`OneShotLatch`.


## Verifying this change

No test coverage should have been affected by this change.
The existing tests in `KinesisDataFetcherTest` verifies this.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / *no*)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / *no*)
  - The serializers: (yes / *no* / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / *no* 
/ don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
  - The S3 file system connector: (yes / *no* / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / *no*)
  - If yes, how is the feature documented? (*not applicable* / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5268


commit 94b45919afa5a3ec3ce68c45e57f7989397f9640
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T02:11:31Z

[FLINK-8398] [kinesis, tests] Cleanup confusing implementations in 
KinesisDataFetcherTest and related classes

The previous implementation of the TestableKinesisDataFetcher was
confusing in various ways, causing it hard to be re-used for other
tests. This commit contains the following various cleaups:

- Remove confusing mocks of source context and checkpoint lock. We now
  allow users of the TestableKinesisDataFetcher to provide a source
  context, which should provide the checkpoint lock.
- Remove override of emitRecordAndUpdateState(). Strictly speaking, that
  method should be final. It was previously overriden to allow
  verifying how many records were output by the fetcher. That
  verification would be better implemented within a mock source context.
- Properly parameterize the output type for the
  TestableKinesisDataFetcher.
- Remove use of PowerMockito in KinesisDataFetcherTest.
- Use CheckedThreads to properly capture any exceptions in fetcher /
  consumer threads in unit tests.
- Use assertEquals / assertNull instead of assertTrue where-ever
  appropriate.

commit 547d19f9196512231661f427f3792f2e1f831339
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T05:41:49Z

[FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests

Prior to this commit, several unit tests in KinesisDataFetcherTest
relied on sleeps to wait until a certain operation happens, in order for
the test to pass.

This commit removes those sleeps and replaces the test behaviours with
OneShotLatches.




> Stabilize flaky KinesisDataFetcherTests
> ---
>
> Key: FLINK-8398
> URL: https://issues.apache.org/jira/browse/FLINK-8398
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0, 1.4.1
>
>
> The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. 
> They rely on on thread sleeps to wait for a certain operation to happen, 
> which can easily miss and cause tests to fail.
> Although there isn't any reports of consistent failures on these tests yet 
> (as far as I am aware of),  they can easily surface in the future.

[GitHub] flink pull request #5268: [FLINK-8398] [kinesis, tests] Harden KinesisDataFe...

2018-01-09 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5268

[FLINK-8398] [kinesis, tests] Harden KinesisDataFetcherTest unit tests

## What is the purpose of the change

Prior to this PR, many of the `KinesisDataFetcherTest` unit tests relied on 
thread sleeps to wait until a certain operation occurs to allow the test to 
pass. This test behaviour is very flaky, and should be replaced with 
`OneShotLatch`.

## Brief change log

- 94b4591: Several minor cleanups of confusing implementations / code 
smells in the `KinesisDataFetcherTest` and related test classes. The commit 
message explains what exactly was changed.
- 547d19f: Remove thread sleeps in unit tests, and replace them with 
`OneShotLatch`.


## Verifying this change

No test coverage should have been affected by this change.
The existing tests in `KinesisDataFetcherTest` verifies this.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / *no*)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / *no*)
  - The serializers: (yes / *no* / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / *no* 
/ don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
  - The S3 file system connector: (yes / *no* / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / *no*)
  - If yes, how is the feature documented? (*not applicable* / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5268


commit 94b45919afa5a3ec3ce68c45e57f7989397f9640
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T02:11:31Z

[FLINK-8398] [kinesis, tests] Cleanup confusing implementations in 
KinesisDataFetcherTest and related classes

The previous implementation of the TestableKinesisDataFetcher was
confusing in various ways, causing it hard to be re-used for other
tests. This commit contains the following various cleaups:

- Remove confusing mocks of source context and checkpoint lock. We now
  allow users of the TestableKinesisDataFetcher to provide a source
  context, which should provide the checkpoint lock.
- Remove override of emitRecordAndUpdateState(). Strictly speaking, that
  method should be final. It was previously overriden to allow
  verifying how many records were output by the fetcher. That
  verification would be better implemented within a mock source context.
- Properly parameterize the output type for the
  TestableKinesisDataFetcher.
- Remove use of PowerMockito in KinesisDataFetcherTest.
- Use CheckedThreads to properly capture any exceptions in fetcher /
  consumer threads in unit tests.
- Use assertEquals / assertNull instead of assertTrue where-ever
  appropriate.

commit 547d19f9196512231661f427f3792f2e1f831339
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T05:41:49Z

[FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests

Prior to this commit, several unit tests in KinesisDataFetcherTest
relied on sleeps to wait until a certain operation happens, in order for
the test to pass.

This commit removes those sleeps and replaces the test behaviours with
OneShotLatches.




---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319691#comment-16319691
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
Merging to `release-1.3` ..


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
Merging to `release-1.3` ..


---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319690#comment-16319690
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
@casidiablo thanks for the info!
I'll merge this for `release-1.3` then, and will keep an extra eye on 
whether the problem still occurs for 1.4 / 1.5.


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
@casidiablo thanks for the info!
I'll merge this for `release-1.3` then, and will keep an extra eye on 
whether the problem still occurs for 1.4 / 1.5.


---


[jira] [Created] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8398:
--

 Summary: Stabilize flaky KinesisDataFetcherTests
 Key: FLINK-8398
 URL: https://issues.apache.org/jira/browse/FLINK-8398
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector, Tests
Affects Versions: 1.4.0, 1.5.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.5.0, 1.4.1


The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. 
They rely on on thread sleeps to wait for a certain operation to happen, which 
can easily miss and cause tests to fail.

Although there isn't any reports of consistent failures on these tests yet (as 
far as I am aware of),  they can easily surface in the future.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2018-01-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299208#comment-16299208
 ] 

Ted Yu edited comment on FLINK-7795 at 1/10/18 4:08 AM:


https://github.com/google/error-prone/releases/tag/v2.2.0 was the latest 
release.


was (Author: yuzhih...@gmail.com):
https://github.com/google/error-prone/releases/tag/v2.1.3 was the latest 
release.

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319632#comment-16319632
 ] 

ASF GitHub Bot commented on FLINK-4816:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/4828
  
Hi @StephanEwen 

Let me conclude your comment and clarify some questions in my mind.
1. The original design treated all failures in DEPLOY as restore failure. 
That is not fair because it is just one of the reasons.
2. Using `last restored checkpoint ID` to record latest id is not a proper 
way. Maybe I need to put it in state object. Am I right?
3. A better solution might be tracking all failures in TaskManager, and 
only report those failure related to restore as restore failure. Then wrapping 
it with the current checkpoint id and send it back to JobManager.

Do I misunderstand something? Or is there anything else that I didn't 
mentioned?


> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Wei-Che Wei
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOY...

2018-01-09 Thread tony810430
Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/4828
  
Hi @StephanEwen 

Let me conclude your comment and clarify some questions in my mind.
1. The original design treated all failures in DEPLOY as restore failure. 
That is not fair because it is just one of the reasons.
2. Using `last restored checkpoint ID` to record latest id is not a proper 
way. Maybe I need to put it in state object. Am I right?
3. A better solution might be tracking all failures in TaskManager, and 
only report those failure related to restore as restore failure. Then wrapping 
it with the current checkpoint id and send it back to JobManager.

Do I misunderstand something? Or is there anything else that I didn't 
mentioned?


---


[jira] [Assigned] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2018-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-6004:
--

Assignee: Tzu-Li (Gordon) Tai  (was: ChungChe Lai)

> Allow FlinkKinesisConsumer to skip corrupted messages
> -
>
> Key: FLINK-6004
> URL: https://issues.apache.org/jira/browse/FLINK-6004
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> It is quite clear from the fix of FLINK-3679 that in reality, users might 
> encounter corrupted messages from Kafka / Kinesis / generally external 
> sources when deserializing them.
> The consumers should support simply skipping those messages, by letting the 
> deserialization schema return {{null}}, and checking {{null}} values within 
> the consumer.
> This has been done for the Kafka consumer already. This ticket tracks the 
> improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2018-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319572#comment-16319572
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6004:


Since there's been quite a while since this issue was picked up, I assume that 
it is currently inactive. Will pick this up.

> Allow FlinkKinesisConsumer to skip corrupted messages
> -
>
> Key: FLINK-6004
> URL: https://issues.apache.org/jira/browse/FLINK-6004
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: ChungChe Lai
>
> It is quite clear from the fix of FLINK-3679 that in reality, users might 
> encounter corrupted messages from Kafka / Kinesis / generally external 
> sources when deserializing them.
> The consumers should support simply skipping those messages, by letting the 
> deserialization schema return {{null}}, and checking {{null}} values within 
> the consumer.
> This has been done for the Kafka consumer already. This ticket tracks the 
> improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei closed FLINK-5982.
--
   Resolution: Done
Fix Version/s: 1.5.0

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
> Fix For: 1.5.0
>
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319532#comment-16319532
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks a lot for doing these work. Nice to see this PR be merged. =)


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable and Sta...

2018-01-09 Thread tony810430
Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks a lot for doing these work. Nice to see this PR be merged. =)


---


[jira] [Created] (FLINK-8397) Support ROW type in CassandraOutputFormat

2018-01-09 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8397:
-

 Summary: Support ROW type in CassandraOutputFormat
 Key: FLINK-8397
 URL: https://issues.apache.org/jira/browse/FLINK-8397
 Project: Flink
  Issue Type: Improvement
 Environment: Currently, only tuple is supported.
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319445#comment-16319445
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/4150
  
Since EMR only supports Flink 1.3 I had to checkout release-1.3 and compile 
the connector from there. Then I was getting this `Socket not created by this 
factory` error.

I then patched 1.3 to include these changes, and that fixed it.

The current master version can't be used because so many things have 
changed since then, and it does not seem to be compatible with EMR right now.


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-09 Thread casidiablo
Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/4150
  
Since EMR only supports Flink 1.3 I had to checkout release-1.3 and compile 
the connector from there. Then I was getting this `Socket not created by this 
factory` error.

I then patched 1.3 to include these changes, and that fixed it.

The current master version can't be used because so many things have 
changed since then, and it does not seem to be compatible with EMR right now.


---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319441#comment-16319441
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
@casidiablo did you mean that without applying this PR's patch, the current 
master worked for you? Or you had to apply this patch in order for it to work?


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
@casidiablo did you mean that without applying this PR's patch, the current 
master worked for you? Or you had to apply this patch in order for it to work?


---


[jira] [Updated] (FLINK-8396) Create (derived) duplicate Buffer class

2018-01-09 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8396:
---
Description: In order to pass a single buffer to netty multiple times, we 
require a duplicate Buffer instance with a shared {{MemorySegment}} and 
reference counting but separate indices.  This should be read-only.  (was: In 
order to pass a single buffer to netty multiple times, we require a duplicate 
Buffer instance with a shared {{MemorySegment}} and reference counting but 
separate indices.)

> Create (derived) duplicate Buffer class
> ---
>
> Key: FLINK-8396
> URL: https://issues.apache.org/jira/browse/FLINK-8396
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In order to pass a single buffer to netty multiple times, we require a 
> duplicate Buffer instance with a shared {{MemorySegment}} and reference 
> counting but separate indices.  This should be read-only.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8396) Create (derived) duplicate Buffer class

2018-01-09 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8396:
--

 Summary: Create (derived) duplicate Buffer class
 Key: FLINK-8396
 URL: https://issues.apache.org/jira/browse/FLINK-8396
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Reporter: Nico Kruber
Assignee: Nico Kruber


In order to pass a single buffer to netty multiple times, we require a 
duplicate Buffer instance with a shared {{MemorySegment}} and reference 
counting but separate indices.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8395) Create (derived) sliced Buffer class

2018-01-09 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8395:
---
Description: In order to pass sub-regions of a single buffer separately, we 
require a sliced Buffer instance with a shared {{MemorySegment}} and reference 
counting but separate indices. This should be read-only.  (was: In order to 
pass sub-regions of a single buffer separately, we require a sliced Buffer 
instance.)

> Create (derived) sliced Buffer class
> 
>
> Key: FLINK-8395
> URL: https://issues.apache.org/jira/browse/FLINK-8395
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In order to pass sub-regions of a single buffer separately, we require a 
> sliced Buffer instance with a shared {{MemorySegment}} and reference counting 
> but separate indices. This should be read-only.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8395) Create (derived) sliced Buffer class

2018-01-09 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8395:
--

 Summary: Create (derived) sliced Buffer class
 Key: FLINK-8395
 URL: https://issues.apache.org/jira/browse/FLINK-8395
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Reporter: Nico Kruber
Assignee: Nico Kruber


In order to pass sub-regions of a single buffer separately, we require a sliced 
Buffer instance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319170#comment-16319170
 ] 

ASF GitHub Bot commented on FLINK-7511:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4587#discussion_r160524740
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -150,16 +126,13 @@
 */
private boolean nfaChanged;
 
-   public NFA(
-   final TypeSerializer eventSerializer,
+   public NFA(final TypeSerializer eventSerializer,
final long windowTime,
final boolean handleTimeout) {
-
this.eventSerializer = eventSerializer;
--- End diff --

The difference in handling `null` seems strange to me. Could you tell me 
how did you check it?

Anyway as I already said will add the check.


> Remove dead code after dropping backward compatibility with <=1.2
> -
>
> Key: FLINK-7511
> URL: https://issues.apache.org/jira/browse/FLINK-7511
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4587: [FLINK-7511] [cep] Remove dead code after dropping...

2018-01-09 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4587#discussion_r160524740
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -150,16 +126,13 @@
 */
private boolean nfaChanged;
 
-   public NFA(
-   final TypeSerializer eventSerializer,
+   public NFA(final TypeSerializer eventSerializer,
final long windowTime,
final boolean handleTimeout) {
-
this.eventSerializer = eventSerializer;
--- End diff --

The difference in handling `null` seems strange to me. Could you tell me 
how did you check it?

Anyway as I already said will add the check.


---


[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319158#comment-16319158
 ] 

ASF GitHub Bot commented on FLINK-7511:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4587#discussion_r160523676
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
 ---
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A filter function which combines two filter functions with a logical 
and. Thus, the filter
- * function only returns true, iff both filters return true.
- *
- * @param  Type of the element to filter
- * @deprecated This is only used when migrating from an older Flink 
version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} 
instead.
- */
-@Deprecated
-public class AndFilterFunction implements FilterFunction {
--- End diff --

I am afraid your worries are justified :( 

I analyzed the code once again and indeed it is possible to have a 
checkpoint taken in 1.3.x that has serialized  `*FilterFunction` classes 
through the `FilterWrapper` class. It is possible when the job was previously 
restored from 1.2.x checkpoint. Unfortunately I will need to restore those 
classes. I will do that tomorrow and I will also add test for that case.

Thanks for catching that!


> Remove dead code after dropping backward compatibility with <=1.2
> -
>
> Key: FLINK-7511
> URL: https://issues.apache.org/jira/browse/FLINK-7511
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4587: [FLINK-7511] [cep] Remove dead code after dropping...

2018-01-09 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4587#discussion_r160523676
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
 ---
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A filter function which combines two filter functions with a logical 
and. Thus, the filter
- * function only returns true, iff both filters return true.
- *
- * @param  Type of the element to filter
- * @deprecated This is only used when migrating from an older Flink 
version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} 
instead.
- */
-@Deprecated
-public class AndFilterFunction implements FilterFunction {
--- End diff --

I am afraid your worries are justified :( 

I analyzed the code once again and indeed it is possible to have a 
checkpoint taken in 1.3.x that has serialized  `*FilterFunction` classes 
through the `FilterWrapper` class. It is possible when the job was previously 
restored from 1.2.x checkpoint. Unfortunately I will need to restore those 
classes. I will do that tomorrow and I will also add test for that case.

Thanks for catching that!


---


[jira] [Updated] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-01-09 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8394:
--
Issue Type: Test  (was: Bug)

> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-01-09 Thread Ted Yu (JIRA)
Ted Yu created FLINK-8394:
-

 Summary: Lack of synchronization accessing expectedRecord in 
ReceiverThread#shutdown
 Key: FLINK-8394
 URL: https://issues.apache.org/jira/browse/FLINK-8394
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  public void shutdown() {
running = false;
interrupt();
expectedRecord.complete(0L);
{code}
Access to expectedRecord should be protected by synchronization, as done on 
other methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8393) Reconnect to last known JobMaster when connection is lost

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319051#comment-16319051
 ] 

ASF GitHub Bot commented on FLINK-8393:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5267

[FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is 
lost

## What is the purpose of the change

Reconnect to the last known location of a lost `JobMaster` connection.

## Brief change log

- In case of a heartbeat timeout or a disconnect call, the `TaskExecutor` 
tries to reconnect to the last known `JobMaster` location

## Verifying this change

- Added `RegisteredRpcConnection#testReconnect`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
resumeLostJobMasterConnection

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5267.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5267


commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8
Author: Till Rohrmann 
Date:   2018-01-09T16:50:37Z

[FLINK-8392] [rpc] Let termination future be completed by 
AkkaRpcActor#postStop

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination 
future is now
completed from the AkkaRpcActor#postStop method.

commit 21524394bc37372dc13eb5c3938de051cbd6f03e
Author: Till Rohrmann 
Date:   2018-01-08T17:23:27Z

[FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor

This commit introduces the JobExecutor interface which abstracts the actual 
mini cluster
from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well 
as the
FlinkMiniCluster implement this interface, we can run all test base jobs 
either on the
Flip-6 mini cluster or on the current mini cluster.

This closes #4897.

commit 5f1bdc1a8546e24e079753f92f22a397ccba24de
Author: Till Rohrmann 
Date:   2017-12-01T14:02:09Z

[FLINK-8389] [flip6] Release all slots upon closing of JobManager connection

commit 141f21d85f0047e4e5c0776e70ac6d83a03e5943
Author: Till Rohrmann 
Date:   2018-01-09T13:11:20Z

[hotfix] Add retrieval of key sets to DualKeyMap

commit d46ba9c5f6c1320d73b4d0e65462bcf2c45ff28f
Author: Till Rohrmann 
Date:   2017-12-01T14:10:46Z

[hotfix] Enable checkpointing RPC calls

commit 737676e5912a2b43dd195ab2a940bc15af6630fb
Author: Till Rohrmann 
Date:   2018-01-09T08:28:34Z

[hotfix] Add JavaDocs to OnCompletionActions

commit f456248a2e75da4947cea7f2d863db129f0efc5f
Author: Till Rohrmann 
Date:   2018-01-09T15:44:59Z

[hotfix] Refactor JobMasterTest to avoid using Mockito

commit fa7b667a196980e05194b88ba352a12f330b5ad0
Author: Till Rohrmann 
Date:   2018-01-09T19:37:08Z

[FLINK-8393] Reconnect to last known JobMaster when connection is lost




> Reconnect to last known JobMaster when connection is lost
> -
>
> Key: FLINK-8393
> URL: https://issues.apache.org/jira/browse/FLINK-8393
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In case of a connection loss to the {{JobMaster}}, e.g. due to a heartbeat 
> timeout or a disconnect call, then the {{TaskExecutor}} should try to 
> reconnect to the last known {{JobMaster}} location in case that the timeout 
> was a false positive.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5267: [FLINK-8393] [flip6] Reconnect to last known JobMa...

2018-01-09 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5267

[FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is 
lost

## What is the purpose of the change

Reconnect to the last known location of a lost `JobMaster` connection.

## Brief change log

- In case of a heartbeat timeout or a disconnect call, the `TaskExecutor` 
tries to reconnect to the last known `JobMaster` location

## Verifying this change

- Added `RegisteredRpcConnection#testReconnect`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
resumeLostJobMasterConnection

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5267.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5267


commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8
Author: Till Rohrmann 
Date:   2018-01-09T16:50:37Z

[FLINK-8392] [rpc] Let termination future be completed by 
AkkaRpcActor#postStop

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination 
future is now
completed from the AkkaRpcActor#postStop method.

commit 21524394bc37372dc13eb5c3938de051cbd6f03e
Author: Till Rohrmann 
Date:   2018-01-08T17:23:27Z

[FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor

This commit introduces the JobExecutor interface which abstracts the actual 
mini cluster
from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well 
as the
FlinkMiniCluster implement this interface, we can run all test base jobs 
either on the
Flip-6 mini cluster or on the current mini cluster.

This closes #4897.

commit 5f1bdc1a8546e24e079753f92f22a397ccba24de
Author: Till Rohrmann 
Date:   2017-12-01T14:02:09Z

[FLINK-8389] [flip6] Release all slots upon closing of JobManager connection

commit 141f21d85f0047e4e5c0776e70ac6d83a03e5943
Author: Till Rohrmann 
Date:   2018-01-09T13:11:20Z

[hotfix] Add retrieval of key sets to DualKeyMap

commit d46ba9c5f6c1320d73b4d0e65462bcf2c45ff28f
Author: Till Rohrmann 
Date:   2017-12-01T14:10:46Z

[hotfix] Enable checkpointing RPC calls

commit 737676e5912a2b43dd195ab2a940bc15af6630fb
Author: Till Rohrmann 
Date:   2018-01-09T08:28:34Z

[hotfix] Add JavaDocs to OnCompletionActions

commit f456248a2e75da4947cea7f2d863db129f0efc5f
Author: Till Rohrmann 
Date:   2018-01-09T15:44:59Z

[hotfix] Refactor JobMasterTest to avoid using Mockito

commit fa7b667a196980e05194b88ba352a12f330b5ad0
Author: Till Rohrmann 
Date:   2018-01-09T19:37:08Z

[FLINK-8393] Reconnect to last known JobMaster when connection is lost




---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319049#comment-16319049
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5239
  
Thanks for going through the general design @StephanEwen ! As we discussed, 
I agree with your first point. For the second point about RocksDB, this PR 
already contains an optimized way to deal with incremental local checkpoints 
that we did not discuss in our review, because I thought it is too much of a 
low level detail.
It does not work with duplicating streams. Instead, I introduced a state 
handle type for a local directory. In fact, I mapped the previous incremental 
recovery from DFS state also to this new handle type: dfs state is first 
downloaded and then it also simply becomes a local directory state handle. From 
there, both incremental recovery paths are identical.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5239: [FLINK-8360] Implement task-local state recovery

2018-01-09 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5239
  
Thanks for going through the general design @StephanEwen ! As we discussed, 
I agree with your first point. For the second point about RocksDB, this PR 
already contains an optimized way to deal with incremental local checkpoints 
that we did not discuss in our review, because I thought it is too much of a 
low level detail.
It does not work with duplicating streams. Instead, I introduced a state 
handle type for a local directory. In fact, I mapped the previous incremental 
recovery from DFS state also to this new handle type: dfs state is first 
downloaded and then it also simply becomes a local directory state handle. From 
there, both incremental recovery paths are identical.


---


[jira] [Created] (FLINK-8393) Reconnect to last known JobMaster when connection is lost

2018-01-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8393:


 Summary: Reconnect to last known JobMaster when connection is lost
 Key: FLINK-8393
 URL: https://issues.apache.org/jira/browse/FLINK-8393
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In case of a connection loss to the {{JobMaster}}, e.g. due to a heartbeat 
timeout or a disconnect call, then the {{TaskExecutor}} should try to reconnect 
to the last known {{JobMaster}} location in case that the timeout was a false 
positive.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-09 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-6951.
---
Resolution: Invalid

Not a problem anymore

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319021#comment-16319021
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4150
  
Thanks for confirming. I'll close this ticket and PR


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4150
  
Thanks for confirming. I'll close this ticket and PR


---


[jira] [Closed] (FLINK-8284) Custom metrics not being exposed for Prometheus

2018-01-09 Thread Julio Biason (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Biason closed FLINK-8284.
---
Resolution: Invalid

> Custom metrics not being exposed for Prometheus
> ---
>
> Key: FLINK-8284
> URL: https://issues.apache.org/jira/browse/FLINK-8284
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.4.0
> Environment: Linux/CentOS 7
>Reporter: Julio Biason
>
> Following the documentation, we changed our filter that removes events with 
> missing fields to a RichFilterFunction, so we can capture metrics about such 
> events:
> {code:scala}
> public class MissingClientFilter extends RichFilterFunction {
>   private transient Counter counter;
>   @Override
>   public void open(Configuration config) {
>   this.counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("events")
>   .counter("missingClient");
>   }
>   @Override
>   public boolean filter(LineData line) {
>   String client = line.get("client").toString();
>   boolean missing = client.trim().equals("");
>   if (!missing) {
>   this.count();
>   }
>   return !missing;
>   }
>   private void count() {
>   if (this.counter != null) {
>   this.counter.inc();
>   }
>   }
> }
> {code}
> We also added Prometheus as our reporter:
> {noformat}
> metrics.reporters: prom
> metrics.reporter.prom.port: 9105
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> {noformat}
> The problem is accessing port 9105 display all Flink metrics, but not ours.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8284) Custom metrics not being exposed for Prometheus

2018-01-09 Thread Julio Biason (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318999#comment-16318999
 ] 

Julio Biason commented on FLINK-8284:
-

I swear to ${deity} that it didn't work. Restarted the project to check logs 
and suddenly the metrics are there.

Closing this as invalid. Will reopen if I find any reason for it not working in 
the first place.

> Custom metrics not being exposed for Prometheus
> ---
>
> Key: FLINK-8284
> URL: https://issues.apache.org/jira/browse/FLINK-8284
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.4.0
> Environment: Linux/CentOS 7
>Reporter: Julio Biason
>
> Following the documentation, we changed our filter that removes events with 
> missing fields to a RichFilterFunction, so we can capture metrics about such 
> events:
> {code:scala}
> public class MissingClientFilter extends RichFilterFunction {
>   private transient Counter counter;
>   @Override
>   public void open(Configuration config) {
>   this.counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("events")
>   .counter("missingClient");
>   }
>   @Override
>   public boolean filter(LineData line) {
>   String client = line.get("client").toString();
>   boolean missing = client.trim().equals("");
>   if (!missing) {
>   this.count();
>   }
>   return !missing;
>   }
>   private void count() {
>   if (this.counter != null) {
>   this.counter.inc();
>   }
>   }
> }
> {code}
> We also added Prometheus as our reporter:
> {noformat}
> metrics.reporters: prom
> metrics.reporter.prom.port: 9105
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> {noformat}
> The problem is accessing port 9105 display all Flink metrics, but not ours.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318974#comment-16318974
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5239
  
I did a peer review and walk through the code.
Overall, the design look good, +1 for that!

Some comments:
  - I would argue to change the way that these recovery options are 
configured. Currently, this goes through methods on one state backend objects, 
i.e., *configuration in code*. Because that recovery aspect is a really an 
"ops-related" aspect of running a Flink job (or a broader streaming platform), 
it should not be configured in code, but in the config. I found it helpful to 
thing that settings in code are for what concerns the application developers, 
settings in the config for what concerns the people that run Flink. They may be 
the same person in the end, but even then it is helpful because they are 
frequently are in different stages of the application development and 
deployment. Configurations are more easy to "standardize on", like "we want all 
applications in that group to enable local recovery".

  - One thing I am not yet 100% sure of is how this will interact in the 
future with RocksDB's optimized local recovery. I assume that checkpoints will 
in the future always use incremental snapshots. For such, there is no stream of 
bytes to store locally in addition. The files are already local and immutable. 
Here, the RocksDB snapshot should probably directly go through the local 
recovery directory, and the diff files would be persisted from there (the 
complete snapshot, which consists only of hardlinks to the files that are also 
in the work directory, would be retained though). Is the assumption that this 
is a "retain data structure" style mechanism, bespoke for each state backend, 
similar as retaining the heap copy-on-write table for the Heap State Backend?

Now, since this PR is already complicated and needs a heavy rebase, I would 
be okay with doing that in another PR, if there is commitment to do this soon 
(before the 1.5 release branch is cut).

Slightly off topic: This code has a very distinct style of using many 
`@Nonnull` annotations. Other newer parts of the code (the once that use 
annotations) follow the contract "non-null unless annotated with `@Nullable`". 
I don't ask to change this. Would be good to actually have a discussion and 
come up with a recommended style to agree on for the future.



> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5239: [FLINK-8360] Implement task-local state recovery

2018-01-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5239
  
I did a peer review and walk through the code.
Overall, the design look good, +1 for that!

Some comments:
  - I would argue to change the way that these recovery options are 
configured. Currently, this goes through methods on one state backend objects, 
i.e., *configuration in code*. Because that recovery aspect is a really an 
"ops-related" aspect of running a Flink job (or a broader streaming platform), 
it should not be configured in code, but in the config. I found it helpful to 
thing that settings in code are for what concerns the application developers, 
settings in the config for what concerns the people that run Flink. They may be 
the same person in the end, but even then it is helpful because they are 
frequently are in different stages of the application development and 
deployment. Configurations are more easy to "standardize on", like "we want all 
applications in that group to enable local recovery".

  - One thing I am not yet 100% sure of is how this will interact in the 
future with RocksDB's optimized local recovery. I assume that checkpoints will 
in the future always use incremental snapshots. For such, there is no stream of 
bytes to store locally in addition. The files are already local and immutable. 
Here, the RocksDB snapshot should probably directly go through the local 
recovery directory, and the diff files would be persisted from there (the 
complete snapshot, which consists only of hardlinks to the files that are also 
in the work directory, would be retained though). Is the assumption that this 
is a "retain data structure" style mechanism, bespoke for each state backend, 
similar as retaining the heap copy-on-write table for the Heap State Backend?

Now, since this PR is already complicated and needs a heavy rebase, I would 
be okay with doing that in another PR, if there is commitment to do this soon 
(before the 1.5 release branch is cut).

Slightly off topic: This code has a very distinct style of using many 
`@Nonnull` annotations. Other newer parts of the code (the once that use 
annotations) follow the contract "non-null unless annotated with `@Nullable`". 
I don't ask to change this. Would be good to actually have a discussion and 
come up with a recommended style to agree on for the future.



---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318913#comment-16318913
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r160492829
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 ---
@@ -43,9 +44,9 @@ CheckpointStateOutputStream 
createCheckpointStateOutputStream(
 * Closes the stream factory, releasing all internal resources, but 
does not delete any
 * persistent checkpoint data.
 *
-* @throws Exception Exceptions can be forwarded and will be logged by 
the system
+* @throws IOException Exceptions can be forwarded and will be logged 
by the system
 */
-   void close() throws Exception;
+   void close() throws IOException;
--- End diff --

I think this method is actually removed in the latest master.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r160492829
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 ---
@@ -43,9 +44,9 @@ CheckpointStateOutputStream 
createCheckpointStateOutputStream(
 * Closes the stream factory, releasing all internal resources, but 
does not delete any
 * persistent checkpoint data.
 *
-* @throws Exception Exceptions can be forwarded and will be logged by 
the system
+* @throws IOException Exceptions can be forwarded and will be logged 
by the system
 */
-   void close() throws Exception;
+   void close() throws IOException;
--- End diff --

I think this method is actually removed in the latest master.


---


[jira] [Commented] (FLINK-8082) Bump version compatibility check to 1.4

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318906#comment-16318906
 ] 

ASF GitHub Bot commented on FLINK-8082:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5262
  
Fails only with the now fixed "duplicate entry" error in `flink-mesos`, so 
looks like this is fine.


> Bump version compatibility check to 1.4
> ---
>
> Key: FLINK-8082
> URL: https://issues.apache.org/jira/browse/FLINK-8082
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0
>
>
> Similar to FLINK-7977, we must bump the version of the compatibility check to 
> compare 1.5 against 1.4, once it is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5262: [FLINK-8082][build] Bump flink version for japicmp plugin

2018-01-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5262
  
Fails only with the now fixed "duplicate entry" error in `flink-mesos`, so 
looks like this is fine.


---


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318879#comment-16318879
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3633


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable ...

2018-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3633


---


[jira] [Commented] (FLINK-8350) replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components

2018-01-09 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318834#comment-16318834
 ] 

Chesnay Schepler commented on FLINK-8350:
-

I'm not so sure about making {{taskmanager.tmp.dirs}} a deprecated key that is 
shared across all components. That would mean that an outdated configurations 
may suddenly affect the jobmanager.

> replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components
> 
>
> Key: FLINK-8350
> URL: https://issues.apache.org/jira/browse/FLINK-8350
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Configuration
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, there is only a {{taskmanager.tmp.dirs}} configuration parameter 
> which (if unset) is set to YARN/Mesos' application environment paths (the 
> latter not quite yet). With FLINK-8279, we also used this as a fall-back for 
> the BLOB caches and would like to use it for the BLOB server as well. This, 
> however, does not reside on the TaskManager and it only makes sense to have a 
> single temporary directory configuration parameter (if desired, this could be 
> extended).
> I propose to change this to a more generic {{env.io.tmp.dirs}} used by all 
> components, i.e. JobManager, JobMaster, Dispatcher, and all the 
> TaskManager-related instances for both YARN and Mesos.
> 
> TODO: set this value to the appropriate folders for the JobManager code 
> paths
> during cluster deployment (this exists for the TaskManager only for now)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318827#comment-16318827
 ] 

ASF GitHub Bot commented on FLINK-4816:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4828
  
I think this approach is not yet sufficient. There can be various reasons 
why a failure in DEPLOY happens, failed checkpoint restore is only one of the 
reasons.

This also adds some coupling of execution graph state and checkpoint 
coordinator (last restored checkpoint ID) which breaks design and 
responsibilities.

A proper solution here is probably a bit more comprehensive - and need a 
bit more thinking, probably a bigger design document. my first though would be 
to report a proper RestoreException from the TaskManager, keeping a history of 
exceptions that triggered recovery, using that to evaluate fallback, etc.


> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Wei-Che Wei
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOY...

2018-01-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4828
  
I think this approach is not yet sufficient. There can be various reasons 
why a failure in DEPLOY happens, failed checkpoint restore is only one of the 
reasons.

This also adds some coupling of execution graph state and checkpoint 
coordinator (last restored checkpoint ID) which breaks design and 
responsibilities.

A proper solution here is probably a bit more comprehensive - and need a 
bit more thinking, probably a bigger design document. my first though would be 
to report a proper RestoreException from the TaskManager, keeping a history of 
exceptions that triggered recovery, using that to evaluate fallback, etc.


---


[jira] [Commented] (FLINK-8350) replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components

2018-01-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318819#comment-16318819
 ] 

Stephan Ewen commented on FLINK-8350:
-

{{io.tmp.dirs}} is fine with me. Please keep the {{taskmanager.tmp.dirs}} as a 
deprecated key for that config option.

> replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components
> 
>
> Key: FLINK-8350
> URL: https://issues.apache.org/jira/browse/FLINK-8350
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Configuration
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, there is only a {{taskmanager.tmp.dirs}} configuration parameter 
> which (if unset) is set to YARN/Mesos' application environment paths (the 
> latter not quite yet). With FLINK-8279, we also used this as a fall-back for 
> the BLOB caches and would like to use it for the BLOB server as well. This, 
> however, does not reside on the TaskManager and it only makes sense to have a 
> single temporary directory configuration parameter (if desired, this could be 
> extended).
> I propose to change this to a more generic {{env.io.tmp.dirs}} used by all 
> components, i.e. JobManager, JobMaster, Dispatcher, and all the 
> TaskManager-related instances for both YARN and Mesos.
> 
> TODO: set this value to the appropriate folders for the JobManager code 
> paths
> during cluster deployment (this exists for the TaskManager only for now)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7475) support update() in ListState

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318760#comment-16318760
 ] 

ASF GitHub Bot commented on FLINK-7475:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4963
  
@StefanRRichter Thanks! I will


> support update() in ListState
> -
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: yf
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4963: [FLINK-7475] [core][DataStream API] support update() in L...

2018-01-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4963
  
@StefanRRichter Thanks! I will


---


[jira] [Commented] (FLINK-8392) Simplify termination future completion

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318754#comment-16318754
 ] 

ASF GitHub Bot commented on FLINK-8392:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5266

[FLINK-8392] [rpc] Let termination future be completed by 
AkkaRpcActor#postStop

## What is the purpose of the change

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination 
future is now
completed from the AkkaRpcActor#postStop method. This enables that we can 
wait in the `RpcEndpoint#postStop` method on the termination of logically 
nested `RpcEndpoint`.

## Verifying this change

- Covered by existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
simplifyRpcTerminationFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5266


commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8
Author: Till Rohrmann 
Date:   2018-01-09T16:50:37Z

[FLINK-8392] [rpc] Let termination future be completed by 
AkkaRpcActor#postStop

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination 
future is now
completed from the AkkaRpcActor#postStop method.




> Simplify termination future completion
> --
>
> Key: FLINK-8392
> URL: https://issues.apache.org/jira/browse/FLINK-8392
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> With FLINK-7754, we tried to complete the termination future after an 
> {{Actor}} has been completely stopped and has been removed from the 
> {{ActorSystem}}. This, however, is not possible. Furthermore, this change 
> made it impossible that a {{RpcEndpoint}} waits for the termination of 
> another {{RpcEndpoint}} in its {{RpcEndpoint#postStop}} method. Therefore, I 
> propose to revert the changes done by FLINK-7754.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5266: [FLINK-8392] [rpc] Let termination future be compl...

2018-01-09 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5266

[FLINK-8392] [rpc] Let termination future be completed by 
AkkaRpcActor#postStop

## What is the purpose of the change

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination 
future is now
completed from the AkkaRpcActor#postStop method. This enables that we can 
wait in the `RpcEndpoint#postStop` method on the termination of logically 
nested `RpcEndpoint`.

## Verifying this change

- Covered by existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
simplifyRpcTerminationFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5266


commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8
Author: Till Rohrmann 
Date:   2018-01-09T16:50:37Z

[FLINK-8392] [rpc] Let termination future be completed by 
AkkaRpcActor#postStop

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination 
future is now
completed from the AkkaRpcActor#postStop method.




---


  1   2   3   >