[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2017-10-20 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7795:
--
Description: 
http://errorprone.info/ is a tool which detects common coding mistakes.

We should incorporate into Flink build.

  was:
http://errorprone.info/ is a tool which detects common coding mistakes.
We should incorporate into Flink build.


> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4874: [hotfix] [docs] Update the description of time-windowed j...

2017-10-20 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4874
  
Thanks for reviewing this @fhueske. I've updated the doc.
Thanks, Xingcan


---


[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint

2017-10-20 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213380#comment-16213380
 ] 

Chesnay Schepler commented on FLINK-7883:
-

There is already a {{StoppableFunction}} for this exact purpose. It's just a 
matter of implementing it for all sources, and introducing a dedicated 
savepoint command (which should effectively subsume savepoint).

> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.

2017-10-20 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4858
  
Thanks for the review @xccui! 
I've updated the PR.
Cheers, Fabian


---


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213326#comment-16213326
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4858
  
Thanks for the review @xccui! 
I've updated the PR.
Cheers, Fabian


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213322#comment-16213322
 ] 

ASF GitHub Bot commented on FLINK-7755:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r146076115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
 ---
@@ -41,8 +41,7 @@ class DataSetJoinRule
 val joinInfo = join.analyzeCondition
 
 // joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
-// and disable outer joins with non-equality predicates(see FLINK-5520)
-!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
+!joinInfo.pairs().isEmpty
--- End diff --

Yes, that is true but the rule are also applied in different contexts. 
`FlinkLogicalJoin` is used for the initial translation of batch and stream 
programs and `DataSetJoinRule` only for batch. I think it's OK to have these 
checks as safety net.


> Null values are not correctly handled by batch inner and outer joins
> 
>
> Key: FLINK-7755
> URL: https://issues.apache.org/jira/browse/FLINK-7755
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

2017-10-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4858#discussion_r146076115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
 ---
@@ -41,8 +41,7 @@ class DataSetJoinRule
 val joinInfo = join.analyzeCondition
 
 // joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
-// and disable outer joins with non-equality predicates(see FLINK-5520)
-!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
+!joinInfo.pairs().isEmpty
--- End diff --

Yes, that is true but the rule are also applied in different contexts. 
`FlinkLogicalJoin` is used for the initial translation of batch and stream 
programs and `DataSetJoinRule` only for batch. I think it's OK to have these 
checks as safety net.


---


[GitHub] flink issue #4874: [hotfix] [docs] Update the description of time-windowed j...

2017-10-20 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4874
  
Thanks for the PR @xccui. One minor comment. Otherwise, the PR is good to 
merge.


---


[GitHub] flink pull request #4874: [hotfix] [docs] Update the description of time-win...

2017-10-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4874#discussion_r146069345
  
--- Diff: docs/dev/table/sql.md ---
@@ -400,15 +400,14 @@ FROM Orders LEFT JOIN Product ON Orders.productId = 
Product.id
   
 Note: Time-windowed joins are a subset of regular joins 
that can be processed in a streaming fashion.
 
-A time-windowed join requires a special join condition that 
bounds the time on both sides. This can be done by either two appropriate range 
predicates ( , =, =, ) or a BETWEEN 
predicate that compares the time 
attributes of both input tables. The following rules apply for time 
predicates:
+A time-windowed join requires an equi-join predicate and a 
special join condition that bounds the time on both sides. Such a condition can 
be defined by two appropriate range predicates (, =, =, 
) that compare the time 
attributes of both input tables. The following rules apply for time 
predicates:
--- End diff --

two range predicates or a `BETWEEN` predicate. `BETWEEN` is not available 
in Table API yet.


---


[jira] [Resolved] (FLINK-6838) RescalingITCase fails in master branch

2017-10-20 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-6838.
---
Resolution: Cannot Reproduce

> RescalingITCase fails in master branch
> --
>
> Key: FLINK-6838
> URL: https://issues.apache.org/jira/browse/FLINK-6838
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests
>Reporter: Ted Yu
>Priority: Critical
>  Labels: test-stability
>
> {code}
> Tests in error:
>   RescalingITCase.testSavepointRescalingInKeyedState[1] » JobExecution Job 
> execu...
>   RescalingITCase.testSavepointRescalingWithKeyedAndNonPartitionedState[1] » 
> JobExecution
> {code}
> Both failed with similar cause:
> {code}
> testSavepointRescalingInKeyedState[1](org.apache.flink.test.checkpointing.RescalingITCase)
>   Time elapsed: 4.813 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 4 for operator Flat Map 
> -> Sink: Unnamed (1/2).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:967)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 4 for 
> operator Flat Map -> Sink: Unnamed (1/2).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:967)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Cannot register Closeable, registry is already closed. Closing argument.
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot register Closeable, registry is 
> already closed. Closing argument.
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:66)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.openCheckpointStream(RocksDBKeyedStateBackend.java:495)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.openIOHandle(RocksDBKeyedStateBackend.java:394)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.openIOHandle(RocksDBKeyedStateBackend.java:390)
>   at 
> 

[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-20 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213174#comment-16213174
 ] 

Piotr Nowojski commented on FLINK-7737:
---

[~vijikarthi] am I missing something? If there is no file system failure, there 
should be no difference between hsync and hflush. If there is some file system 
failure, not calling hsync during snapshot may cause data loses. I thought that 
this is whole point of this issue.

Imagine that just after operators completed snapshot, there is a complete power 
loss on every machine, both in the Flink cluster and file system cluster. If 
this snapshot was marked as completed 0.1ms before failure, after restart it 
will be chosen for recovery. In that case, if {{hsync}} was not called on 
{{snapshotState()}}, we would have a data loss.

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4878: [FLINK-7895][hotfix][docs]Fix error in example in ...

2017-10-20 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/4878

[FLINK-7895][hotfix][docs]Fix error in example in get late message in 
window doc

* getSideOutput api is only available in SingleOutputOperator class, and is 
not the part of the base class





You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-7895

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4878.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4878


commit ce09fb63138df9fe00af1281e8ee41cc0ad5bb45
Author: minwenjun 
Date:   2017-10-20T20:11:58Z

fix error in example in get late message in window doc




---


[jira] [Commented] (FLINK-7895) Error in the example in SideOutput usage example that show how to get late message of window

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213169#comment-16213169
 ] 

ASF GitHub Bot commented on FLINK-7895:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/4878

[FLINK-7895][hotfix][docs]Fix error in example in get late message in 
window doc

* getSideOutput api is only available in SingleOutputOperator class, and is 
not the part of the base class





You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-7895

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4878.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4878


commit ce09fb63138df9fe00af1281e8ee41cc0ad5bb45
Author: minwenjun 
Date:   2017-10-20T20:11:58Z

fix error in example in get late message in window doc




> Error in the example in SideOutput usage example that show how to get late 
> message of window
> 
>
> Key: FLINK-7895
> URL: https://issues.apache.org/jira/browse/FLINK-7895
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: aitozi
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-20 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212857#comment-16212857
 ] 

Piotr Nowojski edited comment on FLINK-7737 at 10/20/17 8:10 PM:
-

{{StreamWriterBase#close}} issues a {{flush()}} call, so {{syncOnFlush}} flag 
will be honoured also when closing. What "create()" do you have in mind?


was (Author: pnowojski):
What do you mean? {{StreamWriterBase#close}} issues a {{flush()}} call, so 
{{syncOnFlush}} flag will be honoured also when closing. What "create()" do you 
have in mind?

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...

2017-10-20 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
Hi @fhueske  , the pr has been updated according to your comments and also 
has been rebased to the latest master. The pr mainly includes the following 
changes:
1. Refactor `UpdatingPlanChecker`. Change the unique key extractor logic of 
join and use the lexicographic smallest attribute as the common group id.
2. Support time indicator attributes. In fact, we do not need to add 
special processing logic. Time indicator will be transmitted to the downstream.
3. Add more test cases.
4. Refactor stream join function. 


---


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213148#comment-16213148
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
Hi @fhueske  , the pr has been updated according to your comments and also 
has been rebased to the latest master. The pr mainly includes the following 
changes:
1. Refactor `UpdatingPlanChecker`. Change the unique key extractor logic of 
join and use the lexicographic smallest attribute as the common group id.
2. Support time indicator attributes. In fact, we do not need to add 
special processing logic. Time indicator will be transmitted to the downstream.
3. Add more test cases.
4. Refactor stream join function. 


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7895) Error in the example in SideOutput usage example that show how to get late message of window

2017-10-20 Thread aitozi (JIRA)
aitozi created FLINK-7895:
-

 Summary: Error in the example in SideOutput usage example that 
show how to get late message of window
 Key: FLINK-7895
 URL: https://issues.apache.org/jira/browse/FLINK-7895
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.4.0
Reporter: aitozi






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-20 Thread Vijay Srinivasaraghavan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213008#comment-16213008
 ] 

Vijay Srinivasaraghavan commented on FLINK-7737:


[~pnowojski] Thanks for the fix. It addresses the FS sink operation when any of 
the writer implementation is used (AvroKVSinkWriter, SequenceFileWriter, 
StringWriter). However, the BucketingSink is calling flush() when snapshot is 
being taken (see snapshotState()) which causes sync() to happen frequently. 
Essentially we need to call the sync() only while closing the current part 
file. Instead of calling sync() during flush(), having a separate API call to 
handle sync() might help.

[~ryanehobbs] create() is not modified since SYNC_BLOCK flag is not used. This 
fix addresses the writer implementation directly and hence upon writer close, 
sync() will be invoked when the flag is set (as part of the flush() API call).  

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212985#comment-16212985
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user zhenzhongxu commented on the issue:

https://github.com/apache/flink/pull/4844
  
@tillrohrmann created https://issues.apache.org/jira/browse/FLINK-7894


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7894) Improve metrics around fine-grained recovery and associated checkpointing behaviors

2017-10-20 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-7894:
---

 Summary: Improve metrics around fine-grained recovery and 
associated checkpointing behaviors
 Key: FLINK-7894
 URL: https://issues.apache.org/jira/browse/FLINK-7894
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.2, 1.4.0
Reporter: Zhenzhong Xu


Currently, the only metric around fine-grained recovery is "task_failures". 
It's a very high level metric, it would be nice to have the following 
improvements:

* Allows slice and dice into which tasks were restarted. 
* Recovery duration.
* Recovery associated checkpoint behaviors: cancels, failures, etc



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending checkpoin...

2017-10-20 Thread zhenzhongxu
Github user zhenzhongxu commented on the issue:

https://github.com/apache/flink/pull/4844
  
@tillrohrmann created https://issues.apache.org/jira/browse/FLINK-7894


---


[GitHub] flink pull request #4877: [Flink] About SourceFunction extends Serializable

2017-10-20 Thread vim-wj
GitHub user vim-wj opened a pull request:

https://github.com/apache/flink/pull/4877

[Flink] About SourceFunction extends Serializable

SourceFunction extends Function and Serializable, but Function has been 
extends Serializable, so SourceFunction needn't extends Serializable again.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vim-wj/flink flink-sf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4877.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4877


commit 939eb45f2a27c154284896aab206d571ee2176cb
Author: vim-wj <381025...@qq.com>
Date:   2017-10-20T15:08:18Z

modify SourceFunction




---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212868#comment-16212868
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r146014273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

I also put this comment and the above comment in last commit and rebase the 
code today.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-20 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r146014273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

I also put this comment and the above comment in last commit and rebase the 
code today.


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-20 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r146013922
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

Thanks for sharing your idea. 
I think I can understand your consideration and agree with that to some 
extent.

In my current new added tests, I only want to verify the behaviors for 
released channel and un-released channel in handler. So it is easy to just 
control `isReleased` property in mocked `InputChannel`, and does not concern 
the details in `InputChannel`.

As you said, if someone adds more interactions with `InputChannel` in the 
future or so, the previous tests with mocked `InputChannel` may bring potential 
risks. Considering this, I am willing to modify the new added tests with real 
`InputChannel`. :)


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212866#comment-16212866
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r146013922
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

Thanks for sharing your idea. 
I think I can understand your consideration and agree with that to some 
extent.

In my current new added tests, I only want to verify the behaviors for 
released channel and un-released channel in handler. So it is easy to just 
control `isReleased` property in mocked `InputChannel`, and does not concern 
the details in `InputChannel`.

As you said, if someone adds more interactions with `InputChannel` in the 
future or so, the previous tests with mocked `InputChannel` may bring potential 
risks. Considering this, I am willing to modify the new added tests with real 
`InputChannel`. :)


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-20 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212857#comment-16212857
 ] 

Piotr Nowojski commented on FLINK-7737:
---

What do you mean? {{StreamWriterBase#close}} issues a {{flush()}} call, so 
{{syncOnFlush}} flag will be honoured also when closing. What "create()" do you 
have in mind?

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212854#comment-16212854
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r146011321
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

First, if `notifyCreditAvailable` is called 4 times, enqueuing 4 
`InputChannel` s, but `writeAndFlushNextMessageIfPossible` will be called only 
once, not 4 times. Because it will only trigger 
`writeAndFlushNextMessageIfPossible` when the current queue is empty before 
enqueuing the `InputChannel`, so only the first `InputChannel` will trigger 
that.

Second, once `writeAndFlushNextMessageIfPossible` is called, it will loop 
all the `InputChannel` s in the queue until the channel becomes un-writable. 
Then the left `InputChannel` s in the queue will be chosen after the channel 
becomes writable again.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-20 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r146011321
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

First, if `notifyCreditAvailable` is called 4 times, enqueuing 4 
`InputChannel` s, but `writeAndFlushNextMessageIfPossible` will be called only 
once, not 4 times. Because it will only trigger 
`writeAndFlushNextMessageIfPossible` when the current queue is empty before 
enqueuing the `InputChannel`, so only the first `InputChannel` will trigger 
that.

Second, once `writeAndFlushNextMessageIfPossible` is called, it will loop 
all the `InputChannel` s in the queue until the channel becomes un-writable. 
Then the left `InputChannel` s in the queue will be chosen after the channel 
becomes writable again.


---


[jira] [Updated] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7784:

Fix Version/s: 1.4.0

> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-20 Thread Ryan Hobbs (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212816#comment-16212816
 ] 

Ryan Hobbs commented on FLINK-7737:
---

[~pnowojski] will the flag be issue on create() or when the the close() 
operation is executed?  

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212811#comment-16212811
 ] 

ASF GitHub Bot commented on FLINK-7737:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4876

[FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase

## What is the purpose of the change

It depends whether to call hsync or hflush on the underlying file system 
and user preferences. Normally hflush is enough to protect against single 
machine HDFS failures and against TaskManagers failures. However if user is 
using S3 like file system, or wants to protect against whole HDFS rack power 
loss hsync must be used instead.

This is a stop gap solution until proper fix waiting for 
https://issues.apache.org/jira/browse/FLINK-5789

## Verifying this change

This change is hard to test :( One could think about writing a unit test 
using mocks, but that would only copy the implementation.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7737

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4876


commit 8d1c34197d1098cbd5a56ec882da17f42410c1f4
Author: Piotr Nowojski 
Date:   2017-10-20T14:47:52Z

[FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase

It depends whether to call hsync or hflush on the underlying file system
and user preferences. Normally hflush is enough to protect against single
machine HDFS failures and against TaskManagers failures. However if user is
using S3 like file system, or wants to protect againt whole HDFS rack power
loss hsync must be used instead.




> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
>   

[GitHub] flink pull request #4876: [FLINK-7737][filesystem] Add syncOnFlush flag to S...

2017-10-20 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4876

[FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase

## What is the purpose of the change

It depends whether to call hsync or hflush on the underlying file system 
and user preferences. Normally hflush is enough to protect against single 
machine HDFS failures and against TaskManagers failures. However if user is 
using S3 like file system, or wants to protect against whole HDFS rack power 
loss hsync must be used instead.

This is a stop gap solution until proper fix waiting for 
https://issues.apache.org/jira/browse/FLINK-5789

## Verifying this change

This change is hard to test :( One could think about writing a unit test 
using mocks, but that would only copy the implementation.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7737

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4876


commit 8d1c34197d1098cbd5a56ec882da17f42410c1f4
Author: Piotr Nowojski 
Date:   2017-10-20T14:47:52Z

[FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase

It depends whether to call hsync or hflush on the underlying file system
and user preferences. Normally hflush is enough to protect against single
machine HDFS failures and against TaskManagers failures. However if user is
using S3 like file system, or wants to protect againt whole HDFS rack power
loss hsync must be used instead.




---


[jira] [Commented] (FLINK-7889) Enable dependency convergence for flink-streaming-java

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212807#comment-16212807
 ] 

ASF GitHub Bot commented on FLINK-7889:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4875
  
Doesn't this require fixing/enabling convergence in flink-shaded-hadoop and 
flink-runtime?


> Enable dependency convergence for flink-streaming-java
> --
>
> Key: FLINK-7889
> URL: https://issues.apache.org/jira/browse/FLINK-7889
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Hai Zhou UTC+8
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4875: [FLINK-7889] Enable dependency convergence in flink-strea...

2017-10-20 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4875
  
Doesn't this require fixing/enabling convergence in flink-shaded-hadoop and 
flink-runtime?


---


[jira] [Commented] (FLINK-7889) Enable dependency convergence for flink-streaming-java

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212773#comment-16212773
 ] 

ASF GitHub Bot commented on FLINK-7889:
---

GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4875

[FLINK-7889] Enable dependency convergence in flink-streaming-java

Base on #4777

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink 
FLINK-7765-for-flink-streaming-java

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4875.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4875


commit 508ffb368ccd4097f4a46e9514ee1696cd0f0e87
Author: yew1eb 
Date:   2017-10-20T15:11:19Z

Enable dependency convergence in flink-streaming-java




> Enable dependency convergence for flink-streaming-java
> --
>
> Key: FLINK-7889
> URL: https://issues.apache.org/jira/browse/FLINK-7889
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Hai Zhou UTC+8
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4875: [FLINK-7889] Enable dependency convergence in flin...

2017-10-20 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4875

[FLINK-7889] Enable dependency convergence in flink-streaming-java

Base on #4777

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink 
FLINK-7765-for-flink-streaming-java

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4875.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4875


commit 508ffb368ccd4097f4a46e9514ee1696cd0f0e87
Author: yew1eb 
Date:   2017-10-20T15:11:19Z

Enable dependency convergence in flink-streaming-java




---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212756#comment-16212756
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4777
  
@pnowojski 
Multiple commits may refer to the same issue, if the issue is fixed in 
multiple steps. 
```
[FLINK-1234] [runtime] Runtime support some cool new thing
[FLINK-1234] [java api] Add hook for cool thing to java api
[FLINK-1234] [scala api] Add hook for that thing to scala api
[FLINK-1234] [optimizer] Make optimizer aware that it can exploit this thing
```
from [Apache Flink development 
guidelines](https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+development+guidelines).


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-10-20 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4777
  
@pnowojski 
Multiple commits may refer to the same issue, if the issue is fixed in 
multiple steps. 
```
[FLINK-1234] [runtime] Runtime support some cool new thing
[FLINK-1234] [java api] Add hook for cool thing to java api
[FLINK-1234] [scala api] Add hook for that thing to scala api
[FLINK-1234] [optimizer] Make optimizer aware that it can exploit this thing
```
from [Apache Flink development 
guidelines](https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+development+guidelines).


---


[jira] [Commented] (FLINK-4868) Insertion sort could avoid the swaps

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212755#comment-16212755
 ] 

ASF GitHub Bot commented on FLINK-4868:
---

Github user vim-wj closed the pull request at:

https://github.com/apache/flink/pull/4868


> Insertion sort could avoid the swaps
> 
>
> Key: FLINK-4868
> URL: https://issues.apache.org/jira/browse/FLINK-4868
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This is about the fallback to insertion sort at the beginning of 
> {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
> we are at the bottom of the quick sort recursion tree.
> The inner loop does a series of swaps on adjacent elements for moving a block 
> of several elements one slot to the right and inserting the ith element at 
> the hole. However, it would be faster to first copy the ith element to a temp 
> location, and then move the block of elements to the right without swaps, and 
> then copy the original ith element to the hole.
> Moving the block of elements without swaps could be achieved by calling 
> {{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
> calls in {{MemorySegment.swap}} currently being done).
> (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
> memcpy, so I'm not sure if we can do the entire block of elements with maybe 
> even one {{UNSAFE.copyMemory}}.)
> Note that the threshold for switching to the insertion sort could probably be 
> increased after this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4868: [FLINK-4868]About SourceFunction extends Serializa...

2017-10-20 Thread vim-wj
Github user vim-wj closed the pull request at:

https://github.com/apache/flink/pull/4868


---


[GitHub] flink pull request #4874: [hotfix] [docs] Update the description of time-win...

2017-10-20 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4874

[hotfix] [docs] Update the description of time-windowed join for SQL doc

## What is the purpose of the change

This PR updates the description of time-windowed join in the SQL doc page.

## Brief change log

  - Synchronize the description of time-windowed join from the tableApi doc 
page.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (*no*)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (*no*)
  - The serializers: (*no*)
  - The runtime per-record code paths (performance sensitive): (*no*)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (*no*)

## Documentation

  - Does this pull request introduce a new feature? (*no*)
  - If yes, how is the feature documented? (*docs*)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink tempDoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4874


commit 4abb0bb46cf14dea5e17614bd7bc1d2864c67e68
Author: Xingcan Cui 
Date:   2017-10-20T14:52:52Z

[hotfix] [docs] Synchronize the description of time-windowed join from 
tableApi




---


[jira] [Assigned] (FLINK-7889) Enable dependency convergence for flink-streaming-java

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 reassigned FLINK-7889:
-

Assignee: Hai Zhou UTC+8

> Enable dependency convergence for flink-streaming-java
> --
>
> Key: FLINK-7889
> URL: https://issues.apache.org/jira/browse/FLINK-7889
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Hai Zhou UTC+8
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint

2017-10-20 Thread Antoine Philippot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212748#comment-16212748
 ] 

Antoine Philippot commented on FLINK-7883:
--

I made a commit to illustrate the solution 
https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6

If someone can validate the idea or comment it, I would be delighted to submit 
a pull request

> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7893) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7893:
--
Description: Port *CheckpointStatsDetailsSubtasksHandler* to new REST 
endpoint  (was: Port *CheckpointStatsDetailsHandler* to new REST endpoint)

> Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint
> ---
>
> Key: FLINK-7893
> URL: https://issues.apache.org/jira/browse/FLINK-7893
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsDetailsSubtasksHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7893) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 reassigned FLINK-7893:
-

Assignee: (was: Hai Zhou UTC+8)

> Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint
> ---
>
> Key: FLINK-7893
> URL: https://issues.apache.org/jira/browse/FLINK-7893
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsDetailsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7893) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-7893:
-

 Summary: Port CheckpointStatsDetailsSubtasksHandler to new REST 
endpoint
 Key: FLINK-7893
 URL: https://issues.apache.org/jira/browse/FLINK-7893
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination, REST
Reporter: Hai Zhou UTC+8
Assignee: Hai Zhou UTC+8
 Fix For: 1.5.0


Port *CheckpointStatsDetailsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7892) Port CheckpointStatsHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7892:
--
Description: Port *CheckpointStatsHandler* to new REST endpoint  (was: Port 
*CheckpointStatsDetailsHandler* to new REST endpoint)

> Port CheckpointStatsHandler to new REST endpoint
> 
>
> Key: FLINK-7892
> URL: https://issues.apache.org/jira/browse/FLINK-7892
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212726#comment-16212726
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145978661
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

So what is the point of having this `while (true)` if it always terminates 
after first iteration? 

I still think this return is a mistake. Let's say

1. `notifyCreditAvailable` is called 4 times, enqueuing 4 `InputChannel`s 
and calling `writeAndFlushNextMessageIfPossible()` 4 times. However  because 
`channel.isWritable()` returned true, nothing was executed and 
`inputChannelsWithCredit` has 4 `inputChannels`
2. channel writability changes,  `writeAndFlushNextMessageIfPossible` is 
called once, this loop rotates only once, only one `inputChanel` is processed, 
`inputChannelsWithCredit` still has 3 elements, which are dangling indefinitely?


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212725#comment-16212725
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145980535
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

I know that mocking was extensively used so far, but we had quite bad 
experience with it.

I don't agree that it is not interacting, because it is for example 
checking `inputChannel.isReleased()`, which you need to mock one test below. 
Now imagine that during some refactor/implementing new feature someone modifies 
handlers code, by adding some additional interaction/sanity check on 
`inputChannel`. With mocks it will lead with thousands of changes in all of the 
mocked unit tests.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7892) Port CheckpointStatsHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-7892:
-

 Summary: Port CheckpointStatsHandler to new REST endpoint
 Key: FLINK-7892
 URL: https://issues.apache.org/jira/browse/FLINK-7892
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination, REST
Reporter: Hai Zhou UTC+8
Assignee: Hai Zhou UTC+8
 Fix For: 1.5.0


Port *CheckpointStatsDetailsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-20 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145980535
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

I know that mocking was extensively used so far, but we had quite bad 
experience with it.

I don't agree that it is not interacting, because it is for example 
checking `inputChannel.isReleased()`, which you need to mock one test below. 
Now imagine that during some refactor/implementing new feature someone modifies 
handlers code, by adding some additional interaction/sanity check on 
`inputChannel`. With mocks it will lead with thousands of changes in all of the 
mocked unit tests.


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-20 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145978661
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

So what is the point of having this `while (true)` if it always terminates 
after first iteration? 

I still think this return is a mistake. Let's say

1. `notifyCreditAvailable` is called 4 times, enqueuing 4 `InputChannel`s 
and calling `writeAndFlushNextMessageIfPossible()` 4 times. However  because 
`channel.isWritable()` returned true, nothing was executed and 
`inputChannelsWithCredit` has 4 `inputChannels`
2. channel writability changes,  `writeAndFlushNextMessageIfPossible` is 
called once, this loop rotates only once, only one `inputChanel` is processed, 
`inputChannelsWithCredit` still has 3 elements, which are dangling indefinitely?


---


[jira] [Updated] (FLINK-7885) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7885:
--
Description: Port *CheckpointStatsDetailsHandler* to new REST endpoint  
(was: Port *ClusterConfigHandler* to new REST endpoint)
Summary: Port CheckpointStatsDetailsHandler to new REST endpoint  (was: 
Port ClusterConfigHandler to new REST endpoint)

> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7885
> URL: https://issues.apache.org/jira/browse/FLINK-7885
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsDetailsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7884) Port CheckpointConfigHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7884:
--
Description: Port *CheckpointConfigHandler* to new REST endpoint  (was: 
Port *ClusterOverviewHandler* to new REST endpoint)
Summary: Port CheckpointConfigHandler to new REST endpoint  (was: Port 
ClusterOverviewHandler to new REST endpoint)

> Port CheckpointConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7884
> URL: https://issues.apache.org/jira/browse/FLINK-7884
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointConfigHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-10-20 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
I created subtasks in JIRA for the follow ups, however I have bundled them 
pretty arbitrarily (creating a subtask per module would be tiresome)


---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212696#comment-16212696
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
I created subtasks in JIRA for the follow ups, however I have bundled them 
pretty arbitrarily (creating a subtask per module would be tiresome)


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7891) Enable dependency convergence for other modules (please split this subtask up if necessary)

2017-10-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7891:
-

 Summary: Enable dependency convergence for other modules (please 
split this subtask up if necessary)
 Key: FLINK-7891
 URL: https://issues.apache.org/jira/browse/FLINK-7891
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.3.2
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7890) Enable dependency convergence for flink-connector-kafka-base

2017-10-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7890:
-

 Summary: Enable dependency convergence for 
flink-connector-kafka-base
 Key: FLINK-7890
 URL: https://issues.apache.org/jira/browse/FLINK-7890
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7889) Enable dependency convergence for flink-streaming-java

2017-10-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7889:
-

 Summary: Enable dependency convergence for flink-streaming-java
 Key: FLINK-7889
 URL: https://issues.apache.org/jira/browse/FLINK-7889
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.3.2
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7887) Enable dependency convergence for flink-shaded-hadoop

2017-10-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7887:
-

 Summary: Enable dependency convergence for flink-shaded-hadoop
 Key: FLINK-7887
 URL: https://issues.apache.org/jira/browse/FLINK-7887
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.3.2
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7888) Enable dependency convergence for flink-runtime

2017-10-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7888:
-

 Summary: Enable dependency convergence for flink-runtime
 Key: FLINK-7888
 URL: https://issues.apache.org/jira/browse/FLINK-7888
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.3.2
Reporter: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7886) Enable dependency convergence for flink-core

2017-10-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7886:
-

 Summary: Enable dependency convergence for flink-core
 Key: FLINK-7886
 URL: https://issues.apache.org/jira/browse/FLINK-7886
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.3.2
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7885) Port ClusterConfigHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-7885:
-

 Summary: Port ClusterConfigHandler to new REST endpoint
 Key: FLINK-7885
 URL: https://issues.apache.org/jira/browse/FLINK-7885
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination, REST
Reporter: Hai Zhou UTC+8
Assignee: Hai Zhou UTC+8
 Fix For: 1.5.0


Port *ClusterConfigHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7884) Port ClusterOverviewHandler to new REST endpoint

2017-10-20 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-7884:
-

 Summary: Port ClusterOverviewHandler to new REST endpoint
 Key: FLINK-7884
 URL: https://issues.apache.org/jira/browse/FLINK-7884
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination, REST
Reporter: Hai Zhou UTC+8
Assignee: Hai Zhou UTC+8
 Fix For: 1.5.0


Port *ClusterOverviewHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7502) PrometheusReporter improvements

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212675#comment-16212675
 ] 

ASF GitHub Bot commented on FLINK-7502:
---

Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
I implemented your comments and assembled a [small 
setup](https://github.com/mbode/flink-prometheus-example) to test the reporter 
again. 

It currently clones *master* and build the reporter from there. In order to 
test another revision, you can just build 
*flink-metrics-prometheus-1.4-SNAPSHOT.jar* locally, put it into the *flink* 
subdirectory of the project and add `COPY 
flink-metrics-prometheus-1.4-SNAPSHOT.jar /opt/flink/lib` to *flink/Dockerfile*.


> PrometheusReporter improvements
> ---
>
> Key: FLINK-7502
> URL: https://issues.apache.org/jira/browse/FLINK-7502
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
> Fix For: 1.4.0
>
>
> * do not throw exceptions on metrics being registered for second time
> * allow port ranges for setups where multiple reporters are on same host 
> (e.g. one TaskManager and one JobManager)
> * do not use nanohttpd anymore, there is now a minimal http server included 
> in [Prometheus JVM client|https://github.com/prometheus/client_java]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter

2017-10-20 Thread mbode
Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
I implemented your comments and assembled a [small 
setup](https://github.com/mbode/flink-prometheus-example) to test the reporter 
again. 

It currently clones *master* and build the reporter from there. In order to 
test another revision, you can just build 
*flink-metrics-prometheus-1.4-SNAPSHOT.jar* locally, put it into the *flink* 
subdirectory of the project and add `COPY 
flink-metrics-prometheus-1.4-SNAPSHOT.jar /opt/flink/lib` to *flink/Dockerfile*.


---


[jira] [Commented] (FLINK-7732) Test instability in Kafka end-to-end test (invalid Kafka offset)

2017-10-20 Thread Pankaj (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212639#comment-16212639
 ] 

Pankaj commented on FLINK-7732:
---

Hi All, We also faced same issue when using flink with Kafka Consumer. Please 
find error traces. Do we have any fix available ?
 
java.lang.IllegalArgumentException: Invalid offset: -9156... 
java.lang.IllegalArgumentException: Invalid offset: -915623761772  
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
 ~[kafka-clients-.
 
. 

> Test instability in Kafka end-to-end test (invalid Kafka offset)
> 
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that revealed this:
> In 
> org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 
> this is being sent:
> {{long offsetToCommit = 

[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212627#comment-16212627
 ] 

ASF GitHub Bot commented on FLINK-7832:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4823#discussion_r145958985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
 
-   public void setAllocationId(AllocationID allocationId) {
-   this.allocationId = allocationId;
-   }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
 
-   public void setAssignedSlotRequest(PendingSlotRequest 
assignedSlotRequest) {
-   this.assignedSlotRequest = assignedSlotRequest;
-   }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
 
+   public void freeSlot() {
+   Preconditions.checkState(state == State.ALLOCATED, "Slot must 
be allocated before freeing it.");
+
+   state = State.FREE;
+   allocationId = null;
+   }
+
+   public void clearPendingSlotRequest() {
+   Preconditions.checkState(state == State.PENDING, "No slot 
request to clear.");
+
+   state = State.FREE;
+   assignedSlotRequest = null;
+   }
+
+   public void assignPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
+   Preconditions.checkState(state == State.FREE, "Slot must be 
free to be assigned a slot request.");
+
+   state = State.PENDING;
+   assignedSlotRequest = 
Preconditions.checkNotNull(pendingSlotRequest);
+   }
+
+   public void completeAllocation(AllocationID allocationId) {
+   Preconditions.checkState(state == State.PENDING, "In order to 
complete an allocation, the slot has to be allocated.");
+   Preconditions.checkState(Objects.equals(allocationId, 
assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the 
pending slot request.");
+
+   state = State.ALLOCATED;
+   this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --

this wasn't about correctness but clarity, so it's good that you changed it 
:)


> SlotManager should return number of registered slots
> 
>
> Key: FLINK-7832
> URL: https://issues.apache.org/jira/browse/FLINK-7832
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{SlotManager}} should provide information about the number of registered 
> slots for a {{TaskExecutor}} and how many of these slots are still free.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

2017-10-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4823#discussion_r145958985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
 
-   public void setAllocationId(AllocationID allocationId) {
-   this.allocationId = allocationId;
-   }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
 
-   public void setAssignedSlotRequest(PendingSlotRequest 
assignedSlotRequest) {
-   this.assignedSlotRequest = assignedSlotRequest;
-   }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
 
+   public void freeSlot() {
+   Preconditions.checkState(state == State.ALLOCATED, "Slot must 
be allocated before freeing it.");
+
+   state = State.FREE;
+   allocationId = null;
+   }
+
+   public void clearPendingSlotRequest() {
+   Preconditions.checkState(state == State.PENDING, "No slot 
request to clear.");
+
+   state = State.FREE;
+   assignedSlotRequest = null;
+   }
+
+   public void assignPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
+   Preconditions.checkState(state == State.FREE, "Slot must be 
free to be assigned a slot request.");
+
+   state = State.PENDING;
+   assignedSlotRequest = 
Preconditions.checkNotNull(pendingSlotRequest);
+   }
+
+   public void completeAllocation(AllocationID allocationId) {
+   Preconditions.checkState(state == State.PENDING, "In order to 
complete an allocation, the slot has to be allocated.");
+   Preconditions.checkState(Objects.equals(allocationId, 
assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the 
pending slot request.");
+
+   state = State.ALLOCATED;
+   this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --

this wasn't about correctness but clarity, so it's good that you changed it 
:)


---


[jira] [Commented] (FLINK-7855) Port JobVertexAccumulatorsHandler to REST endpoint

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212626#comment-16212626
 ] 

ASF GitHub Bot commented on FLINK-7855:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4857


> Port JobVertexAccumulatorsHandler to REST endpoint
> --
>
> Key: FLINK-7855
> URL: https://issues.apache.org/jira/browse/FLINK-7855
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Port JobVertexAccumulatorsHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4857: [FLINK-7855] Port JobVertexAccumulatorsHandler to ...

2017-10-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4857


---


[jira] [Resolved] (FLINK-7855) Port JobVertexAccumulatorsHandler to REST endpoint

2017-10-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7855.
--
   Resolution: Done
Fix Version/s: 1.4.0

Added via ebc3bc1f9ce9dcbd69fedc2ec79ab03d94d99cef

> Port JobVertexAccumulatorsHandler to REST endpoint
> --
>
> Key: FLINK-7855
> URL: https://issues.apache.org/jira/browse/FLINK-7855
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Port JobVertexAccumulatorsHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7875) StaticFileServer should not reuse the tmp dir

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212597#comment-16212597
 ] 

ASF GitHub Bot commented on FLINK-7875:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4861


> StaticFileServer should not reuse the tmp dir
> -
>
> Key: FLINK-7875
> URL: https://issues.apache.org/jira/browse/FLINK-7875
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The flip-6 {{DispatcherRestEndpoint}} starts the {{StaticFileServer}} always 
> with the same temporay directory. This should be changed, because otherwise 
> multiple rest endpoints might interfere with each other.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4861: [FLINK-7875] [flip6] Start StaticFileServerHandler...

2017-10-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4861


---


[jira] [Closed] (FLINK-7875) StaticFileServer should not reuse the tmp dir

2017-10-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7875.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 142ff78c3851e9393d9c4d5ef1c65798c3dbdd21

> StaticFileServer should not reuse the tmp dir
> -
>
> Key: FLINK-7875
> URL: https://issues.apache.org/jira/browse/FLINK-7875
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The flip-6 {{DispatcherRestEndpoint}} starts the {{StaticFileServer}} always 
> with the same temporay directory. This should be changed, because otherwise 
> multiple rest endpoints might interfere with each other.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212592#comment-16212592
 ] 

ASF GitHub Bot commented on FLINK-7051:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/4873

[FLINK-7051] [table] Bump Calcite version to 1.14.

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Upgrade to Calcite 1.14 to unblock a number of changes in the Table APIs.

## Brief change log

* Minor changes to upgrade to Calcite 1.14.
* Lower `GROUPING()`, `GROUPING_ID()` directly into constants based on the 
specification at 
https://docs.microsoft.com/en-us/sql/t-sql/functions/grouping-id-transact-sql


## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-7051

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4873


commit 53102847436119d2764cdc0b347602de9b3b650f
Author: Haohui Mai 
Date:   2017-10-20T12:32:34Z

[FLINK-7051] [table] Bump Calcite version to 1.14.




> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is 

[jira] [Commented] (FLINK-7875) StaticFileServer should not reuse the tmp dir

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212589#comment-16212589
 ] 

ASF GitHub Bot commented on FLINK-7875:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4861
  
Travis passed. Merging this PR.


> StaticFileServer should not reuse the tmp dir
> -
>
> Key: FLINK-7875
> URL: https://issues.apache.org/jira/browse/FLINK-7875
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The flip-6 {{DispatcherRestEndpoint}} starts the {{StaticFileServer}} always 
> with the same temporay directory. This should be changed, because otherwise 
> multiple rest endpoints might interfere with each other.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7067) Cancel with savepoint does not restart checkpoint scheduler on failure

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212593#comment-16212593
 ] 

ASF GitHub Bot commented on FLINK-7067:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/4254
  
Cool! I'll rebase this and merge after Travis gives the green light.


> Cancel with savepoint does not restart checkpoint scheduler on failure
> --
>
> Key: FLINK-7067
> URL: https://issues.apache.org/jira/browse/FLINK-7067
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.1
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> The `CancelWithSavepoint` action of the JobManager first stops the checkpoint 
> scheduler, then triggers a savepoint, and cancels the job after the savepoint 
> completes.
> If the savepoint fails, the command should not have any side effects and we 
> don't cancel the job. The issue is that the checkpoint scheduler is not 
> restarted though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7874) Add environment logging to cluster entrypoints

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212591#comment-16212591
 ] 

ASF GitHub Bot commented on FLINK-7874:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4860


> Add environment logging to cluster entrypoints
> --
>
> Key: FLINK-7874
> URL: https://issues.apache.org/jira/browse/FLINK-7874
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Add environment logging to all {{ClusterEntrypoints}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7874) Add environment logging to cluster entrypoints

2017-10-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7874.

   Resolution: Done
Fix Version/s: 1.4.0

Added via a5801d058dc955200d726131f98a6f0f883ed4dc

> Add environment logging to cluster entrypoints
> --
>
> Key: FLINK-7874
> URL: https://issues.apache.org/jira/browse/FLINK-7874
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Add environment logging to all {{ClusterEntrypoints}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4873: [FLINK-7051] [table] Bump Calcite version to 1.14.

2017-10-20 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/4873

[FLINK-7051] [table] Bump Calcite version to 1.14.

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Upgrade to Calcite 1.14 to unblock a number of changes in the Table APIs.

## Brief change log

* Minor changes to upgrade to Calcite 1.14.
* Lower `GROUPING()`, `GROUPING_ID()` directly into constants based on the 
specification at 
https://docs.microsoft.com/en-us/sql/t-sql/functions/grouping-id-transact-sql


## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-7051

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4873


commit 53102847436119d2764cdc0b347602de9b3b650f
Author: Haohui Mai 
Date:   2017-10-20T12:32:34Z

[FLINK-7051] [table] Bump Calcite version to 1.14.




---


[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

2017-10-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4254
  
Cool! I'll rebase this and merge after Travis gives the green light.


---


[GitHub] flink pull request #4860: [FLINK-7874] Add logging to the ClusterEntrypoints

2017-10-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4860


---


[GitHub] flink issue #4861: [FLINK-7875] [flip6] Start StaticFileServerHandler with r...

2017-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4861
  
Travis passed. Merging this PR.


---


[jira] [Commented] (FLINK-7861) Suppress ActorKilledExceptions

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212584#comment-16212584
 ] 

ASF GitHub Bot commented on FLINK-7861:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4845


> Suppress ActorKilledExceptions
> --
>
> Key: FLINK-7861
> URL: https://issues.apache.org/jira/browse/FLINK-7861
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> When stopping a {{RpcEndpoint}}, the {{AkkaRpcService}} sends a {{Kill}} 
> message which causes an {{ActorKilledException}} to be thrown. This exception 
> is logged by the {{StoppingSupervisorStrategy}}. This is not necessary 
> because we voluntarily stopped the {{RpcEndpoint}}. In order to clean up 
> logs, I think we should not log this kind of exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7874) Add environment logging to cluster entrypoints

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212585#comment-16212585
 ] 

ASF GitHub Bot commented on FLINK-7874:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4860
  
Thanks for the review @zentol. Failing test is unrelated. Merging this PR.


> Add environment logging to cluster entrypoints
> --
>
> Key: FLINK-7874
> URL: https://issues.apache.org/jira/browse/FLINK-7874
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Add environment logging to all {{ClusterEntrypoints}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4860: [FLINK-7874] Add logging to the ClusterEntrypoints

2017-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4860
  
Thanks for the review @zentol. Failing test is unrelated. Merging this PR.


---


[GitHub] flink pull request #4845: [FLINK-7861] [flip6] Suppress ActorKilledException...

2017-10-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4845


---


[jira] [Closed] (FLINK-7861) Suppress ActorKilledExceptions

2017-10-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7861.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 33497024852c521472c40040cc023526426835af

> Suppress ActorKilledExceptions
> --
>
> Key: FLINK-7861
> URL: https://issues.apache.org/jira/browse/FLINK-7861
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> When stopping a {{RpcEndpoint}}, the {{AkkaRpcService}} sends a {{Kill}} 
> message which causes an {{ActorKilledException}} to be thrown. This exception 
> is logged by the {{StoppingSupervisorStrategy}}. This is not necessary 
> because we voluntarily stopped the {{RpcEndpoint}}. In order to clean up 
> logs, I think we should not log this kind of exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7883) Stop fetching source before a cancel with savepoint

2017-10-20 Thread Antoine Philippot (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antoine Philippot updated FLINK-7883:
-
Description: 
For a cancel with savepoint command, the JobManager trigger the cancel call 
once the savepoint is finished, but during the savepoint execution, kafka 
source continue to poll new messages which will not be part of the savepoint 
and will be replayed on the next application start.

A solution could be to stop fetching the source stream task before triggering 
the savepoint.

I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method 
{{stopFetching}} that existant SourceFunction implementations could implement.

We can add a {{stopFetchingSource}} property in 
 {{CheckpointOptions}} class to pass the desired behaviour from 
{{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
{{SourceStreamTask.triggerCheckpoint}}


  was:
For a cancel with savepoint command, the JobManager trigger the cancel call 
once the savepoint is finished, but during the savepoint execution, kafka 
source continue to poll new messages which will not be part of the savepoint 
and will be replayed on the next application start.

A solution could be to stop fetching the source stream task before triggering 
the savepoint.

I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method 
{{stopFetching}} that existant SourceFunction implementations could implement.

We can add a {{stopFetchingSource` property in {{CheckpointProperties}} and 
 {{CheckpointOptions}} class to pass the desired behaviour from 
{{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
{{SourceStreamTask.triggerCheckpoint}}



> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4845: [FLINK-7861] [flip6] Suppress ActorKilledException loggin...

2017-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4845
  
Travis passed. Merging this PR.


---


[jira] [Commented] (FLINK-7861) Suppress ActorKilledExceptions

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212578#comment-16212578
 ] 

ASF GitHub Bot commented on FLINK-7861:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4845
  
Travis passed. Merging this PR.


> Suppress ActorKilledExceptions
> --
>
> Key: FLINK-7861
> URL: https://issues.apache.org/jira/browse/FLINK-7861
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> When stopping a {{RpcEndpoint}}, the {{AkkaRpcService}} sends a {{Kill}} 
> message which causes an {{ActorKilledException}} to be thrown. This exception 
> is logged by the {{StoppingSupervisorStrategy}}. This is not necessary 
> because we voluntarily stopped the {{RpcEndpoint}}. In order to clean up 
> logs, I think we should not log this kind of exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212577#comment-16212577
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4844
  
@zhenzhongxu what kind of metrics would you be interested in? I think we 
should open a dedicated issue for that.


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending checkpoin...

2017-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4844
  
@zhenzhongxu what kind of metrics would you be interested in? I think we 
should open a dedicated issue for that.


---


[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212573#comment-16212573
 ] 

ASF GitHub Bot commented on FLINK-7648:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4824
  
Thanks for your review @zentol. Once the preceding PR are merged, I'll 
merge this one as well.


> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4824: [FLINK-7648] [flip6] Add TaskManagersHandler

2017-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4824
  
Thanks for your review @zentol. Once the preceding PR are merged, I'll 
merge this one as well.


---


[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212566#comment-16212566
 ] 

ASF GitHub Bot commented on FLINK-7832:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4823
  
Thanks for your review @zentol. I've addressed your comments and hope to 
have clarified the `updateAllocation` call for the state transition `FREE` -> 
`ALLOCATED`. 

If there are not other objections, then I would like to merge this PR once 
Travis gives green light.


> SlotManager should return number of registered slots
> 
>
> Key: FLINK-7832
> URL: https://issues.apache.org/jira/browse/FLINK-7832
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{SlotManager}} should provide information about the number of registered 
> slots for a {{TaskExecutor}} and how many of these slots are still free.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4823: [FLINK-7832] [flip6] Extend SlotManager to report free sl...

2017-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4823
  
Thanks for your review @zentol. I've addressed your comments and hope to 
have clarified the `updateAllocation` call for the state transition `FREE` -> 
`ALLOCATED`. 

If there are not other objections, then I would like to merge this PR once 
Travis gives green light.


---


[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212565#comment-16212565
 ] 

ASF GitHub Bot commented on FLINK-7832:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4823#discussion_r145950011
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
 
-   public void setAllocationId(AllocationID allocationId) {
-   this.allocationId = allocationId;
-   }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
 
-   public void setAssignedSlotRequest(PendingSlotRequest 
assignedSlotRequest) {
-   this.assignedSlotRequest = assignedSlotRequest;
-   }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
 
+   public void freeSlot() {
+   Preconditions.checkState(state == State.ALLOCATED, "Slot must 
be allocated before freeing it.");
+
+   state = State.FREE;
+   allocationId = null;
+   }
+
+   public void clearPendingSlotRequest() {
+   Preconditions.checkState(state == State.PENDING, "No slot 
request to clear.");
+
+   state = State.FREE;
+   assignedSlotRequest = null;
+   }
+
+   public void assignPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
+   Preconditions.checkState(state == State.FREE, "Slot must be 
free to be assigned a slot request.");
+
+   state = State.PENDING;
+   assignedSlotRequest = 
Preconditions.checkNotNull(pendingSlotRequest);
+   }
+
+   public void completeAllocation(AllocationID allocationId) {
+   Preconditions.checkState(state == State.PENDING, "In order to 
complete an allocation, the slot has to be allocated.");
+   Preconditions.checkState(Objects.equals(allocationId, 
assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the 
pending slot request.");
+
+   state = State.ALLOCATED;
+   this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --

But will move it to the top to make it clearer.


> SlotManager should return number of registered slots
> 
>
> Key: FLINK-7832
> URL: https://issues.apache.org/jira/browse/FLINK-7832
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{SlotManager}} should provide information about the number of registered 
> slots for a {{TaskExecutor}} and how many of these slots are still free.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >