[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327147478
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1485,6 +1525,20 @@ else if (!isRestartable && 
transitionState(currentState, JobStatus.FAILED, failu
}
}
 
+   public void failJob(Throwable cause) {
+   if (state == JobStatus.FAILING || 
state.isGloballyTerminalState()) {
+   return;
+   }
+
+   transitionState(JobStatus.FAILING);
+   initFailureCause(cause);
+
+   cancelVerticesAsync().whenComplete((aVoid, throwable) -> {
+   transitionState(JobStatus.FAILED);
 
 Review comment:
   Oh I see. I think we can do that for transition to `FAILING` as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #9158: [FLINK-10052][coordination] Tolerate temporarily suspended ZooKeeper connections

2019-09-23 Thread GitBox
tillrohrmann commented on issue #9158: [FLINK-10052][coordination] Tolerate 
temporarily suspended ZooKeeper connections
URL: https://github.com/apache/flink/pull/9158#issuecomment-534127456
 
 
   I'd prefer to split the version bump and the functional change into two PRs 
@lamber-ken.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327150696
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Operations on the ExecutionVertex.
+ */
+interface ExecutionVertexOperations {
+
+   void deploy(ExecutionVertex executionVertex) throws JobException;
 
 Review comment:
   Ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure CheckpointException can always be deserialized on JobManager

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure 
CheckpointException can always be deserialized on JobManager
URL: https://github.com/apache/flink/pull/9742#issuecomment-533957111
 
 
   
   ## CI report:
   
   * b1c025876fff8d6c7807e55613aabefef891ed59 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687104)
   * 0b7c90578d0745548eba38593d4f320a7f33669e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687968)
   * 2def508db5b57c2a387e395ccf605a2af4c886d1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128688964)
   * 2844e47e44e5ab556c3848bf1156a0d02032ac09 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128695177)
   * 4e31d0ee9093b5e69934e6035d81392a878dbe2a : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128758376)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14172) Implement KubeClient with official Java client library for kubernetes

2019-09-23 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-14172:
--
Parent: FLINK-9953
Issue Type: Sub-task  (was: New Feature)

> Implement KubeClient with official Java client library for kubernetes
> -
>
> Key: FLINK-14172
> URL: https://issues.apache.org/jira/browse/FLINK-14172
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yang Wang
>Priority: Major
>
> Official Java client library for kubernetes is become more and more active. 
> The new features(such as leader election) and some client 
> implementations(informer, lister, cache) are better. So we should use the 
> official java client for kubernetes in flink.
> https://github.com/kubernetes-client/java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9720: [FLINK-13025] Elasticsearch 7.x support

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9720: [FLINK-13025] Elasticsearch 7.x 
support
URL: https://github.com/apache/flink/pull/9720#issuecomment-533110106
 
 
   
   ## CI report:
   
   * d9c1dd529ef235649909d067cc78099179656e62 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128334151)
   * 603db694488096f1491b5ccb068d9e783636a8c8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128375005)
   * 9bc5949275f7997eadd03e2ec1fe8937ee2e689f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128453921)
   * 53bfd624b1258c5f1c269952d155f2c981476769 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128462077)
   * 566cbfff4439557cb6cdd767db501f8c49e5caf6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128467326)
   * bbae03349addd55bb69328c64f07209b7aa3190e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128751101)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-09-23 Thread David Anderson (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson reopened FLINK-12576:


To reproduce,


git clone --branch backpressure-with-2-TMs 
https://github.com/alpinegizmo/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

You will find a job with these 5 operators

(1) kafka -> (2) timestamps / watermarks -> (3) keyBy + backpressure map -> (4) 
keyBy + window -> (5) kafka

where #3, the backpressure map, causes severe backpressure every other minute. 
The job is running with parallelism of 2 throughout; up until the first keyBy 
all the traffic is on the subtasks with 0 as their index. 

In this backpressure-with-2-TMs branch there are two TMs each with one slot. 
You will observe that all of the output metrics for the 0-index watermarking 
subtask rise to 1 during the even-numbered minutes, and fall to 0 during the 
odd numbered minutes, as expected. 

If I run this with one TM with 2 slots, all of the input metrics for the 
backpressure operator are always zero. 

In this backpressure-with-2-TMs branch there are 2 single-slot TMs, and here 
the input metrics for subtask 0 of the backpressure operator are always 0, but 
the input metrics for subtask 1 of that operator rise and fall every minute, as 
they should. Thus my conclusion that the local input metrics are still broken.



> inputQueueLength metric does not work for LocalInputChannels
> 
>
> Key: FLINK-12576
> URL: https://issues.apache.org/jira/browse/FLINK-12576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Piotr Nowojski
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{inputQueueLength}} ignores LocalInputChannels 
> ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
> when looking for causes of back pressure (If task is back pressuring whole 
> Flink job, but there is a data skew and only local input channels are being 
> used).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #9749: [FLINK-14115][docs-zh] Translate DataStream Code Walkthrough to Chinese

2019-09-23 Thread GitBox
flinkbot commented on issue #9749: [FLINK-14115][docs-zh] Translate DataStream 
Code Walkthrough to Chinese
URL: https://github.com/apache/flink/pull/9749#issuecomment-534140505
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 141eb6db531af4b12a9506489f280f180c02a408 (Mon Sep 23 
15:04:27 UTC 2019)
   
✅no warnings
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor 
StreamInputProcessor#processInput based on InputStatus
URL: https://github.com/apache/flink/pull/9483#issuecomment-522602263
 
 
   
   ## CI report:
   
   * 5d079442de5815edf896b85fa7aa3ca599975bb0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/123734022)
   * 725be7c2226608382c67d5b3d372886be737fff6 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/124870650)
   * eeb6f54f2316e5a6ef8f1cbed4f002c3ae37107e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/12569)
   * e8d22a8c88d9c4a3d10fe989761c54e2d56ba3bf : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/125557289)
   * 9b8109a8586bcf8095eba8d24c6cd0dbb21d7810 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125559250)
   * ac519615c6a2e28bb6544d2dce2dee43a03f3928 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125831290)
   * ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127164915)
   * b7261f4a827d037e6a0bf825b342688b98effb98 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state for RetractableTopNFunction

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state 
for RetractableTopNFunction
URL: https://github.com/apache/flink/pull/9741#issuecomment-533791202
 
 
   
   ## CI report:
   
   * 511ae6ef82ec45f34e0270c4c534d64de2856107 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128604445)
   * c2189a7898832df5876f1fef1b7e58fb04b0257c : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128766418)
   * 4fbfe8d806605b3a31c7059b72e8ebe62b0f07e4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128770121)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state for RetractableTopNFunction

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state 
for RetractableTopNFunction
URL: https://github.com/apache/flink/pull/9741#issuecomment-533791202
 
 
   
   ## CI report:
   
   * 511ae6ef82ec45f34e0270c4c534d64de2856107 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128604445)
   * c2189a7898832df5876f1fef1b7e58fb04b0257c : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128766418)
   * 4fbfe8d806605b3a31c7059b72e8ebe62b0f07e4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128770121)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure CheckpointException can always be deserialized on JobManager

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure 
CheckpointException can always be deserialized on JobManager
URL: https://github.com/apache/flink/pull/9742#issuecomment-533957111
 
 
   
   ## CI report:
   
   * b1c025876fff8d6c7807e55613aabefef891ed59 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687104)
   * 0b7c90578d0745548eba38593d4f320a7f33669e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687968)
   * 2def508db5b57c2a387e395ccf605a2af4c886d1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128688964)
   * 2844e47e44e5ab556c3848bf1156a0d02032ac09 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128695177)
   * 4e31d0ee9093b5e69934e6035d81392a878dbe2a : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-09-23 Thread John Lonergan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935904#comment-16935904
 ] 

John Lonergan commented on FLINK-14170:
---

Yep it's unnecessarily restrictive and actually breaks Parquest even though it 
would otherwise work just fine on Hadoop 2.6.

Remove the global check in construction and instead the make the code throw an 
"NotImplementedException" +only+ if a sink actually happens to make that call.

> Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder
> -
>
> Key: FLINK-14170
> URL: https://issues.apache.org/jira/browse/FLINK-14170
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0
>Reporter: Bhagavan
>Priority: Major
>
> Currently, StreamingFileSink is supported only with Hadoop >= 2.7 
> irrespective of Row/bulk format builder. This restriction is due to truncate 
> is not supported in  Hadoop < 2.7
> However, BulkFormatBuilder does not use truncate method to restore the file. 
> So the restricting StreamingFileSink.BulkFormatBuilder to be used only with 
> Hadoop >= 2.7 is not necessary.
> So requested improvement is to remove the precondition on 
> HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
> Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] gaofeilong198810 opened a new pull request #9749: [FLINK-14115][docs-zh] Translate DataStream Code Walkthrough to Chinese

2019-09-23 Thread GitBox
gaofeilong198810 opened a new pull request #9749: [FLINK-14115][docs-zh] 
Translate DataStream Code Walkthrough to Chinese
URL: https://github.com/apache/flink/pull/9749
 
 
   
   
   ## What is the purpose of the change
   
   Translate DataStream Code Walkthrough to Chinese
   
   ## Brief change log
   
   Translate DataStream Code Walkthrough to Chinese
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state for RetractableTopNFunction

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state 
for RetractableTopNFunction
URL: https://github.com/apache/flink/pull/9741#issuecomment-533791202
 
 
   
   ## CI report:
   
   * 511ae6ef82ec45f34e0270c4c534d64de2856107 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128604445)
   * c2189a7898832df5876f1fef1b7e58fb04b0257c : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14118) Reduce the unnecessary flushing when there is no data available for flush

2019-09-23 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935947#comment-16935947
 ] 

Yingjie Cao commented on FLINK-14118:
-

[~pnowojski] You are right. High data skew can reproduce the problem. My test 
case uses 1000 output channel and 1ms flushing interval and writes most of the 
data to channel 0. The throughput results are as follows.

Before the fix:

Benchmark                                                                       
              (channelsFlushTimeout) Mode  Cnt    Score           Error       
Units
StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms 
thrpt 30 18240.197 ± 1892.419 ops/ms

After the fix:

Benchmark                                                                       
                 (channelsFlushTimeout)Mode  Cnt    Score            Error  
  Units StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput   
1000,1ms  thrpt   30  24532.313 ± 1118.312 ops/ms

Some other cases are also tested (all with 1ms flushing interval) and no 
evident performance difference is observed. The test results are as follows.

Before the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score        Error         
Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput          
1000,1ms  thrpt   30  23032.384 ± 871.883  ops/ms KeyByBenchmarks.arrayKeyBy1MS 
                                                                                
   thrpt   30    1923.863   ± 78.518   ops/ms KeyByBenchmarks.tupleKeyBy1MS     
                                                                               
thrpt   30    3377.401   ± 216.982  ops/ms 
InputBenchmark.mapRebalanceMapSink1MS                                           
                        thrpt   30    6091.213   ±  92.658   ops/ms 
InputBenchmark.mapSinkBufferTimeout1MS                                          
                          thrpt   30    9107.194    ± 211.169   ops/ms

After the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score             Error      
Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput          
1000,1ms  thrpt   30    23985.588 ± 990.037  ops/ms 
KeyByBenchmarks.arrayKeyBy1MS                                                   
                                 thrpt   30      2011.356   ± 40.347    ops/ms 
KeyByBenchmarks.tupleKeyBy1MS                                                   
                                 thrpt   30     3440.238   ± 211.906   ops/ms 
InputBenchmark.mapRebalanceMapSink1MS                                           
                        thrpt   30      6118.888    ±  94.517   ops/ms  
InputBenchmark.mapSinkBufferTimeout1MS                                          
                          thrpt   30       9120.144  ± 252.023   ops/ms

The extra synchronization point dose not introduce any regression to the above 
test cases. I guess the reason is that the synchronization point sits in the 
synchronization block which also need a memory barrier.

Moving the output flushing logic to the mailbox is a good choice, though just 
like what you have mentioned, the main concern is how to efficiently implement 
the "flushAll" mailbox action.

I wonder if the mailbox facility will be introduced to version 1.8 and 1.9. If 
not, I would suggest to pick the fix to version 1.8 and 1.9.

> Reduce the unnecessary flushing when there is no data available for flush
> -
>
> Key: FLINK-14118
> URL: https://issues.apache.org/jira/browse/FLINK-14118
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.1, 1.8.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new flush implementation which works by triggering a netty user event may 
> cause performance regression compared to the old synchronization-based one. 
> More specifically, when there is exactly one BufferConsumer in the buffer 
> queue of subpartition and no new data will be added for a while in the future 
> (may because of just no input or the logic of the operator is to collect some 
> data for processing and will not emit records immediately), that is, there is 
> no data to send, the OutputFlusher will continuously notify data available 
> and wake up the netty thread, though no data will be returned by the 
> pollBuffer method.
> For some of our production jobs, this will incur 20% to 40% CPU overhead 
> compared to the old implementation. We tried to fix the problem by checking 

[GitHub] [flink] flinkbot edited a comment on issue #9749: [FLINK-14115][docs-zh] Translate DataStream Code Walkthrough to Chinese

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9749: [FLINK-14115][docs-zh] Translate 
DataStream Code Walkthrough to Chinese
URL: https://github.com/apache/flink/pull/9749#issuecomment-534149758
 
 
   
   ## CI report:
   
   * 141eb6db531af4b12a9506489f280f180c02a408 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128770115)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9468: [FLINK-13689] [Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level cli…

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9468: [FLINK-13689] 
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level 
cli…
URL: https://github.com/apache/flink/pull/9468#issuecomment-522106295
 
 
   
   ## CI report:
   
   * 78654e8f80aa0b1a01654e7684f4610eb1d3aff4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/123548600)
   * f4a0f59543372d85e7fd1dc5156ba6f165737a03 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536515)
   * 1a81936bf11ca32e564f00e2deb7391126fce03e : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor 
StreamInputProcessor#processInput based on InputStatus
URL: https://github.com/apache/flink/pull/9483#issuecomment-522602263
 
 
   
   ## CI report:
   
   * 5d079442de5815edf896b85fa7aa3ca599975bb0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/123734022)
   * 725be7c2226608382c67d5b3d372886be737fff6 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/124870650)
   * eeb6f54f2316e5a6ef8f1cbed4f002c3ae37107e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/12569)
   * e8d22a8c88d9c4a3d10fe989761c54e2d56ba3bf : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/125557289)
   * 9b8109a8586bcf8095eba8d24c6cd0dbb21d7810 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125559250)
   * ac519615c6a2e28bb6544d2dce2dee43a03f3928 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125831290)
   * ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127164915)
   * b7261f4a827d037e6a0bf825b342688b98effb98 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128774017)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14173) ANSI-style JOIN with Temporal Table Function fails

2019-09-23 Thread Jira
Benoît Paris created FLINK-14173:


 Summary: ANSI-style JOIN with Temporal Table Function fails
 Key: FLINK-14173
 URL: https://issues.apache.org/jira/browse/FLINK-14173
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Affects Versions: 1.9.0
 Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached)
Reporter: Benoît Paris
 Attachments: flink-test-temporal-tables-1.9.zip

The planner fails to generate a plan for ANSI-style joins with Temporal Table 
Functions. The Blink planners throws with a "Missing conversion is 
LogicalTableFunctionScan[convention: NONE -> LOGICAL]" message (and some very 
fancy graphviz stuff). The old planner does a "This exception indicates that 
the query uses an unsupported SQL feature."

This fails:
{code:java}
 SELECT 
   o_amount * r_amount AS amount 
 FROM Orders 
 JOIN LATERAL TABLE (Rates(o_proctime)) 
   ON r_currency = o_currency {code}
This works:
{code:java}
 SELECT 
   o_amount * r_amount AS amount 
 FROM Orders 
, LATERAL TABLE (Rates(o_proctime)) 
 WHERE r_currency = o_currency{code}
Reproduction with the attached Java and pom.xml files. Also included: stack 
traces for both Blink and the old planner.

I think this is a regression. I remember using ANSI-style joins with a temporal 
table function successfully in 1.8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14115) Translate DataStream Code Walkthrough to Chinese

2019-09-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14115:
---
Labels: pull-request-available  (was: )

> Translate DataStream Code Walkthrough to Chinese
> 
>
> Key: FLINK-14115
> URL: https://issues.apache.org/jira/browse/FLINK-14115
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.10.0
>Reporter: Fabian Hueske
>Assignee: gaofeilong
>Priority: Major
>  Labels: pull-request-available
>
> The new DataStream Code Walkthrough should be translated to Chinese:
> https://github.com/apache/flink/blob/master/docs/getting-started/walkthroughs/datastream_api.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14115) Translate DataStream Code Walkthrough to Chinese

2019-09-23 Thread gaofeilong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935942#comment-16935942
 ] 

gaofeilong commented on FLINK-14115:


Hi [~fhueske], I have done this work and made a PR. please have a check, thx.

> Translate DataStream Code Walkthrough to Chinese
> 
>
> Key: FLINK-14115
> URL: https://issues.apache.org/jira/browse/FLINK-14115
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.10.0
>Reporter: Fabian Hueske
>Assignee: gaofeilong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new DataStream Code Walkthrough should be translated to Chinese:
> https://github.com/apache/flink/blob/master/docs/getting-started/walkthroughs/datastream_api.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-09-23 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935937#comment-16935937
 ] 

David Anderson edited comment on FLINK-12576 at 9/23/19 3:07 PM:
-

To reproduce,


git clone --branch backpressure-with-2-TMs 
https://github.com/alpinegizmo/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

You will find a job with these 5 operators

(1) kafka -> (2) timestamps / watermarks -> (3) keyBy + backpressure map -> (4) 
keyBy + window -> (5) kafka

where #3, the backpressure map, causes severe backpressure every other minute. 
The job is running with parallelism of 2 throughout; up until the first keyBy 
all the traffic is on the subtasks with 0 as their index. 

In this backpressure-with-2-TMs branch there are two TMs each with one slot. 
You will observe that all of the output metrics for the 0-index watermarking 
subtask rise to 1 during the even-numbered minutes, and fall to 0 during the 
odd numbered minutes, as expected. 

If I run this with one TM with 2 slots, all of the input metrics for the 
backpressure operator are always zero. 

To confirm that the metrics do work in the non-local case, I created this
backpressure-with-2-TMs branch where there are 2 single-slot TMs. In this case 
the input metrics for subtask 0 of the backpressure operator are always 0, but 
the input metrics for subtask 1 of that operator rise and fall every minute, as 
they should. Since subtask 0 is handling 2x as many records as subtask 1, I 
conclude that the local input metrics are still broken.




was (Author: alpinegizmo):
To reproduce,


git clone --branch backpressure-with-2-TMs 
https://github.com/alpinegizmo/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

You will find a job with these 5 operators

(1) kafka -> (2) timestamps / watermarks -> (3) keyBy + backpressure map -> (4) 
keyBy + window -> (5) kafka

where #3, the backpressure map, causes severe backpressure every other minute. 
The job is running with parallelism of 2 throughout; up until the first keyBy 
all the traffic is on the subtasks with 0 as their index. 

In this backpressure-with-2-TMs branch there are two TMs each with one slot. 
You will observe that all of the output metrics for the 0-index watermarking 
subtask rise to 1 during the even-numbered minutes, and fall to 0 during the 
odd numbered minutes, as expected. 

If I run this with one TM with 2 slots, all of the input metrics for the 
backpressure operator are always zero. 

In this backpressure-with-2-TMs branch there are 2 single-slot TMs, and here 
the input metrics for subtask 0 of the backpressure operator are always 0, but 
the input metrics for subtask 1 of that operator rise and fall every minute, as 
they should. Since subtask 0 is handling 2x as many records as subtask 1, I 
conclude that the local input metrics are still broken.



> inputQueueLength metric does not work for LocalInputChannels
> 
>
> Key: FLINK-12576
> URL: https://issues.apache.org/jira/browse/FLINK-12576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{inputQueueLength}} ignores LocalInputChannels 
> ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
> when looking for causes of back pressure (If task is back pressuring whole 
> Flink job, but there is a data skew and only local input channels are being 
> used).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327175478
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
 
 Review 

[jira] [Comment Edited] (FLINK-14118) Reduce the unnecessary flushing when there is no data available for flush

2019-09-23 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935947#comment-16935947
 ] 

Yingjie Cao edited comment on FLINK-14118 at 9/23/19 3:17 PM:
--

[~pnowojski] You are right. High data skew can reproduce the problem. My test 
case uses 1000 output channel and 1ms flushing interval and writes most of the 
data to channel 0. The throughput results are as follows.

Before the fix:

Benchmark        (channelsFlushTimeout) Mode  Cnt    Score           Error      
 Units

StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms 
thrpt 30 18240.197 ± 1892.419 ops/ms

After the fix:

Benchmark  (channelsFlushTimeout)Mode  Cnt    Score            Error    
Units

StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms  
thrpt   30  24532.313 ± 1118.312 ops/ms

Some other cases are also tested (all with 1ms flushing interval) and no 
evident performance difference is observed. The test results are as follows.

Before the fix:

Benchmark               (channelsFlushTimeout)   Mode  Cnt      Score        
Error         Units

StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,1ms  thrpt  30  
23032.384 ± 871.883  ops/ms

KeyByBenchmarks.arrayKeyBy1MS                    thrpt   30    1923.863   ± 
78.518   ops/ms

KeyByBenchmarks.tupleKeyBy1MS                     thrpt   30    3377.401   ± 
216.982  ops/ms

InputBenchmark.mapRebalanceMapSink1MS    thrpt   30    6091.213   ±  92.658   
ops/ms

InputBenchmark.mapSinkBufferTimeout1MS     thrpt   30    9107.194    ± 211.169  
 ops/ms

After the fix:

Benchmark    (channelsFlushTimeout)   Mode  Cnt      Score             Error    
  Units

StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,1ms  thrpt   30 
   23985.588 ± 990.037  ops/ms

KeyByBenchmarks.arrayKeyBy1MS                   thrpt   30      2011.356   ± 
40.347    ops/ms

KeyByBenchmarks.tupleKeyBy1MS                   thrpt   30     3440.238   ± 
211.906   ops/ms

InputBenchmark.mapRebalanceMapSink1MS thrpt   30      6118.888    ±  94.517   
ops/ms 

InputBenchmark.mapSinkBufferTimeout1MS  thrpt   30       9120.144  ± 252.023   
ops/ms

The extra synchronization point dose not introduce any regression to the above 
test cases. I guess the reason is that the synchronization point sits in the 
synchronization block which also need a memory barrier.

Moving the output flushing logic to the mailbox is a good choice, though just 
like what you have mentioned, the main concern is how to efficiently implement 
the "flushAll" mailbox action.

sorry for the poor format..

I wonder if the mailbox facility will be introduced to version 1.8 and 1.9. If 
not, I would suggest to pick the fix to version 1.8 and 1.9.


was (Author: kevin.cyj):
[~pnowojski] You are right. High data skew can reproduce the problem. My test 
case uses 1000 output channel and 1ms flushing interval and writes most of the 
data to channel 0. The throughput results are as follows.

Before the fix:

Benchmark                                                                       
              (channelsFlushTimeout) Mode  Cnt    Score           Error       
Units

StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms 
thrpt 30 18240.197 ± 1892.419 ops/ms

After the fix:

Benchmark                                                                       
                 (channelsFlushTimeout)Mode  Cnt    Score            Error  
  Units

StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput   1000,1ms  
thrpt   30  24532.313 ± 1118.312 ops/ms

Some other cases are also tested (all with 1ms flushing interval) and no 
evident performance difference is observed. The test results are as follows.

Before the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score        Error         Units

StreamNetworkThroughputBenchmarkExecutor.networkThroughput          1000,1ms  
thrpt   30  23032.384 ± 871.883  ops/ms KeyByBenchmarks.arrayKeyBy1MS           
                                                                         thrpt  
 30    1923.863   ± 78.518   ops/ms

KeyByBenchmarks.tupleKeyBy1MS                     thrpt   30    3377.401   ± 
216.982  ops/ms

InputBenchmark.mapRebalanceMapSink1MS    thrpt   30    6091.213   ±  92.658   
ops/ms

InputBenchmark.mapSinkBufferTimeout1MS     thrpt   30    9107.194    ± 211.169  
 ops/ms

After the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score             Error      
Units

StreamNetworkThroughputBenchmarkExecutor.networkThroughput          1000,1ms  
thrpt   30    23985.588 ± 990.037  ops/ms

KeyByBenchmarks.arrayKeyBy1MS                   thrpt   30      2011.356   ± 
40.347    ops/ms


[GitHub] [flink] flinkbot edited a comment on issue #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#issuecomment-524181331
 
 
   
   ## CI report:
   
   * 512499aae1ab33c5ae96c8a2100016e40d83b654 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/124321949)
   * d6648c7071fc4c17faf8c7514f303e7b3081b5dc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128705829)
   * 231db4c305a95c20701c9ad1b04d422ce0eda939 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128777224)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-14094) Fix OperatorIOMetricGroup repeat register problem

2019-09-23 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-14094.
---
Resolution: Fixed

Fixed via 627e7cb1f265e8cf8e953c17ee492ad957539f2f

> Fix OperatorIOMetricGroup repeat register problem
> -
>
> Key: FLINK-14094
> URL: https://issues.apache.org/jira/browse/FLINK-14094
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0, 1.10.0
>Reporter: xymaqingxiang
>Assignee: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There will be OperatorIOMetricGroup duplicate registration in the 
> TaskMetricGroup's getOrAddOperator() method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann closed pull request #9697: [FLINK-14094] [runtime] [metric] Fix OperatorIOMetricGroup repeat register problem

2019-09-23 Thread GitBox
tillrohrmann closed pull request #9697: [FLINK-14094] [runtime] [metric] Fix 
OperatorIOMetricGroup repeat register problem
URL: https://github.com/apache/flink/pull/9697
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement 
DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#issuecomment-529930381
 
 
   
   ## CI report:
   
   * 5bfec29d38218b1bd5236163a7f2dd2571afa8b2 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/126616886)
   * 166b8777fcc896f3f6ef7008133af35dbb554204 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126619729)
   * 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127150468)
   * 09b61ef86b135960b2d21c6cf8d5f510684137ad : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127152805)
   * 9441f984cf179d8dc9212ffc59aea4b5ef922350 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127155093)
   * 52720e056437080ecc2906a61d59283b709f61a5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/127195882)
   * 8dfe15d8ea104f097077c7aedeae2ea5f49aae60 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/127198668)
   * 53b62a899ea4b0f71012a780f674dcd04191ee0d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/127217387)
   * a66e3ae1354674d1ebb5987139a501454044e9d8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/127800121)
   * 509f2521ae10af3f1b46ec543ce3efd6dee79c5d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127806151)
   * d562afbcf67a6556ee2f33c5c1e5fb196bc62702 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128183983)
   * b6f4111fd191e7706798d118d97ada80df343276 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128728786)
   * 45d0d972b5ac6de24f1c532df06ffe5924d92a2d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128733497)
   * 64de89aa017c370177bb9417bb2817f0864932ac : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128751065)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-09-23 Thread David Anderson (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-12576:
---
Affects Version/s: 1.9.0

> inputQueueLength metric does not work for LocalInputChannels
> 
>
> Key: FLINK-12576
> URL: https://issues.apache.org/jira/browse/FLINK-12576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{inputQueueLength}} ignores LocalInputChannels 
> ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
> when looking for causes of back pressure (If task is back pressuring whole 
> Flink job, but there is a data skew and only local input channels are being 
> used).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14118) Reduce the unnecessary flushing when there is no data available for flush

2019-09-23 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935947#comment-16935947
 ] 

Yingjie Cao edited comment on FLINK-14118 at 9/23/19 3:13 PM:
--

[~pnowojski] You are right. High data skew can reproduce the problem. My test 
case uses 1000 output channel and 1ms flushing interval and writes most of the 
data to channel 0. The throughput results are as follows.

Before the fix:

Benchmark                                                                       
              (channelsFlushTimeout) Mode  Cnt    Score           Error       
Units

StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms 
thrpt 30 18240.197 ± 1892.419 ops/ms

After the fix:

Benchmark                                                                       
                 (channelsFlushTimeout)Mode  Cnt    Score            Error  
  Units

StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput   1000,1ms  
thrpt   30  24532.313 ± 1118.312 ops/ms

Some other cases are also tested (all with 1ms flushing interval) and no 
evident performance difference is observed. The test results are as follows.

Before the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score        Error         Units

StreamNetworkThroughputBenchmarkExecutor.networkThroughput          1000,1ms  
thrpt   30  23032.384 ± 871.883  ops/ms KeyByBenchmarks.arrayKeyBy1MS           
                                                                         thrpt  
 30    1923.863   ± 78.518   ops/ms

KeyByBenchmarks.tupleKeyBy1MS                     thrpt   30    3377.401   ± 
216.982  ops/ms

InputBenchmark.mapRebalanceMapSink1MS    thrpt   30    6091.213   ±  92.658   
ops/ms

InputBenchmark.mapSinkBufferTimeout1MS     thrpt   30    9107.194    ± 211.169  
 ops/ms

After the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score             Error      
Units

StreamNetworkThroughputBenchmarkExecutor.networkThroughput          1000,1ms  
thrpt   30    23985.588 ± 990.037  ops/ms

KeyByBenchmarks.arrayKeyBy1MS                   thrpt   30      2011.356   ± 
40.347    ops/ms

KeyByBenchmarks.tupleKeyBy1MS                   thrpt   30     3440.238   ± 
211.906   ops/ms

InputBenchmark.mapRebalanceMapSink1MS thrpt   30      6118.888    ±  94.517   
ops/ms 

InputBenchmark.mapSinkBufferTimeout1MS  thrpt   30       9120.144  ± 252.023   
ops/ms

The extra synchronization point dose not introduce any regression to the above 
test cases. I guess the reason is that the synchronization point sits in the 
synchronization block which also need a memory barrier.

Moving the output flushing logic to the mailbox is a good choice, though just 
like what you have mentioned, the main concern is how to efficiently implement 
the "flushAll" mailbox action.

I wonder if the mailbox facility will be introduced to version 1.8 and 1.9. If 
not, I would suggest to pick the fix to version 1.8 and 1.9.


was (Author: kevin.cyj):
[~pnowojski] You are right. High data skew can reproduce the problem. My test 
case uses 1000 output channel and 1ms flushing interval and writes most of the 
data to channel 0. The throughput results are as follows.

Before the fix:

Benchmark                                                                       
              (channelsFlushTimeout) Mode  Cnt    Score           Error       
Units
StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms 
thrpt 30 18240.197 ± 1892.419 ops/ms

After the fix:

Benchmark                                                                       
                 (channelsFlushTimeout)Mode  Cnt    Score            Error  
  Units StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput   
1000,1ms  thrpt   30  24532.313 ± 1118.312 ops/ms

Some other cases are also tested (all with 1ms flushing interval) and no 
evident performance difference is observed. The test results are as follows.

Before the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score        Error         
Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput          
1000,1ms  thrpt   30  23032.384 ± 871.883  ops/ms KeyByBenchmarks.arrayKeyBy1MS 
                                                                                
   thrpt   30    1923.863   ± 78.518   ops/ms KeyByBenchmarks.tupleKeyBy1MS     
                                                                               
thrpt   30    3377.401   ± 216.982  ops/ms 
InputBenchmark.mapRebalanceMapSink1MS                                           
                        thrpt   30    6091.213   ±  92.658   ops/ms 

[GitHub] [flink] AlecCh0402 commented on a change in pull request #9741: [FLINK-14119][table] Clean idle state for RetractableTopNFunction

2019-09-23 Thread GitBox
AlecCh0402 commented on a change in pull request #9741: [FLINK-14119][table] 
Clean idle state for RetractableTopNFunction
URL: https://github.com/apache/flink/pull/9741#discussion_r327182330
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
 ##
 @@ -219,4 +219,38 @@ public void testDisableGenerateRetraction() throws 
Exception {
.assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
}
 
+   @Test
+   public void testCleanIdleState() throws Exception {
+   AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new ConstantRankRange(1, 2), true,
+   true);
+   OneInputStreamOperatorTestHarness testHarness 
= createTestHarness(func);
+   testHarness.open();
+
+   testHarness.processElement(record("book", 1L, 12));
+   testHarness.processElement(retractRecord("book", 1L, 12));
+   testHarness.processElement(record("fruit", 4L, 33));
+   testHarness.processElement(record("book", 2L, 19));
+   testHarness.processElement(record("fruit", 3L, 22));
+   testHarness.processElement(record("fruit", 5L, 43));
+
+   // cleanup state explicitly
+   func.onTimer(System.currentTimeMillis(), null, null);
 
 Review comment:
   Hi Jark, thanks for the kind and thorough review. I've updated the commit. 
`testHarness.setProcessingTime` is truly the right way to trigger timer.  It 
sounds a little bit weird but I just want to explain what I was thinking, is 
based on the assumption that timer is always triggered correctly, and the only 
thing to test is when `onTimer` is called directly, the state will be cleaned 
anyway. And since the test itself is based on a keyed stream , so the cleanup 
would only happen on the latest processed partition key's related states. 
Thanks anyway for the review :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Johnlon commented on a change in pull request #4663: [FLINK-6549] [DataStream API] Improve error message for type mismatches with side outputs

2019-09-23 Thread GitBox
Johnlon commented on a change in pull request #4663: [FLINK-6549] [DataStream 
API] Improve error message for type mismatches with side outputs
URL: https://github.com/apache/flink/pull/4663#discussion_r327226943
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -526,6 +526,16 @@ public void collect(StreamRecord record) {
StreamRecord copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
+   } catch (ClassCastException e) {
+   // Enrich error message
+   ClassCastException replace = new 
ClassCastException(
+   String.format("%s. Failed to 
push OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(), 
outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
 
 Review comment:
   Swallows the original exception which is never a good thing for someone 
trying to diagnose a problem like this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9468: [FLINK-13689] [Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level cli…

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9468: [FLINK-13689] 
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level 
cli…
URL: https://github.com/apache/flink/pull/9468#issuecomment-522106295
 
 
   
   ## CI report:
   
   * 78654e8f80aa0b1a01654e7684f4610eb1d3aff4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/123548600)
   * f4a0f59543372d85e7fd1dc5156ba6f165737a03 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536515)
   * 1a81936bf11ca32e564f00e2deb7391126fce03e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128774076)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14174) Don't swallow exception when rethrowing type mismatches with side outputs

2019-09-23 Thread John Lonergan (Jira)
John Lonergan created FLINK-14174:
-

 Summary: Don't swallow exception when rethrowing type mismatches 
with side outputs
 Key: FLINK-14174
 URL: https://issues.apache.org/jira/browse/FLINK-14174
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.9.0, 1.8.1
Reporter: John Lonergan


The change made by https://github.com/apache/flink/pull/4663/files swallows the 
original exception.

Whilst I am in favour of adding additional helpful tips (which was the purpose 
of FLINK-4663) I don't agree with throwing away or masking causal exceptions.

IMHO the correct approach is to add the helpful hint as the first arg to "new 
ExceptionInChainedOperatorException(msg, ex)" and pass the original class cast 
ex as the cause.

Ie change this .. 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L672




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14172) Implement KubeClient with official Java client library for kubernetes

2019-09-23 Thread Yang Wang (Jira)
Yang Wang created FLINK-14172:
-

 Summary: Implement KubeClient with official Java client library 
for kubernetes
 Key: FLINK-14172
 URL: https://issues.apache.org/jira/browse/FLINK-14172
 Project: Flink
  Issue Type: New Feature
Reporter: Yang Wang


Official Java client library for kubernetes is become more and more active. The 
new features(such as leader election) and some client implementations(informer, 
lister, cache) are better. So we should use the official java client for 
kubernetes in flink.

https://github.com/kubernetes-client/java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state for RetractableTopNFunction

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9741: [FLINK-14119][table] Clean idle state 
for RetractableTopNFunction
URL: https://github.com/apache/flink/pull/9741#issuecomment-533791202
 
 
   
   ## CI report:
   
   * 511ae6ef82ec45f34e0270c4c534d64de2856107 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128604445)
   * c2189a7898832df5876f1fef1b7e58fb04b0257c : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128766418)
   * 4fbfe8d806605b3a31c7059b72e8ebe62b0f07e4 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure CheckpointException can always be deserialized on JobManager

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure 
CheckpointException can always be deserialized on JobManager
URL: https://github.com/apache/flink/pull/9742#issuecomment-533957111
 
 
   
   ## CI report:
   
   * b1c025876fff8d6c7807e55613aabefef891ed59 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687104)
   * 0b7c90578d0745548eba38593d4f320a7f33669e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687968)
   * 2def508db5b57c2a387e395ccf605a2af4c886d1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128688964)
   * 2844e47e44e5ab556c3848bf1156a0d02032ac09 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128695177)
   * 4e31d0ee9093b5e69934e6035d81392a878dbe2a : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128758376)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#issuecomment-524181331
 
 
   
   ## CI report:
   
   * 512499aae1ab33c5ae96c8a2100016e40d83b654 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/124321949)
   * d6648c7071fc4c17faf8c7514f303e7b3081b5dc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128705829)
   * 231db4c305a95c20701c9ad1b04d422ce0eda939 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor 
StreamInputProcessor#processInput based on InputStatus
URL: https://github.com/apache/flink/pull/9483#issuecomment-522602263
 
 
   
   ## CI report:
   
   * 5d079442de5815edf896b85fa7aa3ca599975bb0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/123734022)
   * 725be7c2226608382c67d5b3d372886be737fff6 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/124870650)
   * eeb6f54f2316e5a6ef8f1cbed4f002c3ae37107e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/12569)
   * e8d22a8c88d9c4a3d10fe989761c54e2d56ba3bf : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/125557289)
   * 9b8109a8586bcf8095eba8d24c6cd0dbb21d7810 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125559250)
   * ac519615c6a2e28bb6544d2dce2dee43a03f3928 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125831290)
   * ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127164915)
   * b7261f4a827d037e6a0bf825b342688b98effb98 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128774017)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9468: [FLINK-13689] [Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level cli…

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9468: [FLINK-13689] 
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level 
cli…
URL: https://github.com/apache/flink/pull/9468#issuecomment-522106295
 
 
   
   ## CI report:
   
   * 78654e8f80aa0b1a01654e7684f4610eb1d3aff4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/123548600)
   * f4a0f59543372d85e7fd1dc5156ba6f165737a03 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536515)
   * 1a81936bf11ca32e564f00e2deb7391126fce03e : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128774076)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327145413
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable 
cause, ExecutionAttemptID fai
 * @param t The exception that caused the failure.
 */
public void failGlobal(Throwable t) {
+   if (!isLegacyScheduling()) {
+   ExceptionUtils.rethrow(t);
 
 Review comment:
   > Maybe we need to implement a failGlobal mechanism that works for 
DefaultScheduler.
   
   I think that's reasonable and it should be straightforward to do this. 
   
   Moreover, I think we should differentiate calls to `failGlobal()`. The 
original contract for `failGlobal()` was that it is called when [_"the 
consistency of the execution graph cannot be guaranteed 
anymore"_](https://github.com/apache/flink/blob/8155d465520c4b616866d35395c3b10f7e809b78/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1124).
 Clearly this does not hold anymore. One example that you have alreayd 
mentioned is the 
[`CheckpointFailureManager`](https://github.com/apache/flink/blob/8155d465520c4b616866d35395c3b10f7e809b78/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L582)
 – here calls to `failGlobal()` do not indicate consistency issues. For real 
consistency issues caused by bugs, illegal state transitions, etc. I think it 
is reasonable to terminate the JVM. One could propagate the exception to the 
JobMaster and in the RPC framework check for unwanted uncaught exceptions in 
specially annotated RPC methods.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #9158: [FLINK-10052][coordination] Tolerate temporarily suspended ZooKeeper connections

2019-09-23 Thread GitBox
lamber-ken commented on issue #9158: [FLINK-10052][coordination] Tolerate 
temporarily suspended ZooKeeper connections
URL: https://github.com/apache/flink/pull/9158#issuecomment-534131656
 
 
   > I'd prefer to split the version bump and the functional change into two 
PRs @lamber-ken.
   
   Hi, trill. If so, I'll create 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken closed pull request #9158: [FLINK-10052][coordination] Tolerate temporarily suspended ZooKeeper connections

2019-09-23 Thread GitBox
lamber-ken closed pull request #9158: [FLINK-10052][coordination] Tolerate 
temporarily suspended ZooKeeper connections
URL: https://github.com/apache/flink/pull/9158
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327163946
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+   this.slotRequestTimeout = slotRequestTimeout;
+   this.slotProvider = slotProvider;
+   this.delayExecutor = delayExecutor;
+   this.userCodeLoader = userCodeLoader;
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = executionVertexVersioner;
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
 
 Review comment:
   @zhuzhurk
   
   I gave this some thought. 
   
   > when we invoke maybeHandleTaskFailure right after invoking 
schedulingStrategy.onExecutionStateChange, the task state may even have changed 
in the call stack chain so that we are doing failover handling in an unexpected 
state
   
   If we added `getMainThreadExecutor().execute()` to 
`allocateSlotsAndDeploy()`, I think it wouldn't solve this issue. We still 
might re-deploy tasks that were already re-deployed. 
   
   I think there is a deeper underlying problem to why these lines look 
confusing:
   ```
   schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
   maybeHandleTaskFailure(taskExecutionState, executionVertexId);
   ```
   
   The `SchedulingStrategy` gets informed about task failures but is not 
supposed to re-deploy or reason about failed tasks because the failure handling 
is done centrally (e.g. inside `flip1.FailoverStrategy`). 
   
   At the moment, I wouldn't change anything. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-09-23 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935937#comment-16935937
 ] 

David Anderson edited comment on FLINK-12576 at 9/23/19 2:58 PM:
-

To reproduce,


git clone --branch backpressure-with-2-TMs 
https://github.com/alpinegizmo/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

You will find a job with these 5 operators

(1) kafka -> (2) timestamps / watermarks -> (3) keyBy + backpressure map -> (4) 
keyBy + window -> (5) kafka

where #3, the backpressure map, causes severe backpressure every other minute. 
The job is running with parallelism of 2 throughout; up until the first keyBy 
all the traffic is on the subtasks with 0 as their index. 

In this backpressure-with-2-TMs branch there are two TMs each with one slot. 
You will observe that all of the output metrics for the 0-index watermarking 
subtask rise to 1 during the even-numbered minutes, and fall to 0 during the 
odd numbered minutes, as expected. 

If I run this with one TM with 2 slots, all of the input metrics for the 
backpressure operator are always zero. 

In this backpressure-with-2-TMs branch there are 2 single-slot TMs, and here 
the input metrics for subtask 0 of the backpressure operator are always 0, but 
the input metrics for subtask 1 of that operator rise and fall every minute, as 
they should. Since subtask 0 is handling 2x as many records as subtask 1, I 
conclude that the local input metrics are still broken.




was (Author: alpinegizmo):
To reproduce,


git clone --branch backpressure-with-2-TMs 
https://github.com/alpinegizmo/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

You will find a job with these 5 operators

(1) kafka -> (2) timestamps / watermarks -> (3) keyBy + backpressure map -> (4) 
keyBy + window -> (5) kafka

where #3, the backpressure map, causes severe backpressure every other minute. 
The job is running with parallelism of 2 throughout; up until the first keyBy 
all the traffic is on the subtasks with 0 as their index. 

In this backpressure-with-2-TMs branch there are two TMs each with one slot. 
You will observe that all of the output metrics for the 0-index watermarking 
subtask rise to 1 during the even-numbered minutes, and fall to 0 during the 
odd numbered minutes, as expected. 

If I run this with one TM with 2 slots, all of the input metrics for the 
backpressure operator are always zero. 

In this backpressure-with-2-TMs branch there are 2 single-slot TMs, and here 
the input metrics for subtask 0 of the backpressure operator are always 0, but 
the input metrics for subtask 1 of that operator rise and fall every minute, as 
they should. Thus my conclusion that the local input metrics are still broken.



> inputQueueLength metric does not work for LocalInputChannels
> 
>
> Key: FLINK-12576
> URL: https://issues.apache.org/jira/browse/FLINK-12576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Piotr Nowojski
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{inputQueueLength}} ignores LocalInputChannels 
> ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
> when looking for causes of back pressure (If task is back pressuring whole 
> Flink job, but there is a data skew and only local input channels are being 
> used).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lamber-ken edited a comment on issue #9158: [FLINK-10052][coordination] Tolerate temporarily suspended ZooKeeper connections

2019-09-23 Thread GitBox
lamber-ken edited a comment on issue #9158: [FLINK-10052][coordination] 
Tolerate temporarily suspended ZooKeeper connections
URL: https://github.com/apache/flink/pull/9158#issuecomment-534131656
 
 
   > I'd prefer to split the version bump and the functional change into two 
PRs @lamber-ken.
   
   Hi, trill. If so, I'll create two new PRs
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken opened a new pull request #9158: [FLINK-10052][coordination] Tolerate temporarily suspended ZooKeeper connections

2019-09-23 Thread GitBox
lamber-ken opened a new pull request #9158: [FLINK-10052][coordination] 
Tolerate temporarily suspended ZooKeeper connections
URL: https://github.com/apache/flink/pull/9158
 
 
   ## What is the purpose of the change
   ### Desc
   The `ZooKeeperLeaderElectionService` uses the LeaderLatch Curator recipe for 
leader election. The leader latch revokes leadership in case of a suspended 
ZooKeeper connection. This can be premature in case that the system can 
reconnect to ZooKeeper before its session expires. The effect of the lost 
leadership is that all jobs will be canceled and directly restarted after 
regaining the leadership. When more and more tasks deployed in cluster, the 
situation will get worse and worse.
   
   ### Aims
   - Fix the impact of zookeeper network disconnect temporarily on flink long 
running jobs.
   - Improve the stability of the flink longrunning jobs.
   
   ### Detail
   Click here for more jira detail 
[FLINK-10052](https://issues.apache.org/jira/browse/FLINK-10052).
   
   
   
   ## Brief change log
   
 - upgrage curator-recipes dependency to 4.2.0
 - use `SessionConnectionStateErrorPolicy` as the error connect policy when 
build `CuratorFramework`
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*ZooKeeperLeaderElectionITCase*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken edited a comment on issue #9158: [FLINK-10052][coordination] Tolerate temporarily suspended ZooKeeper connections

2019-09-23 Thread GitBox
lamber-ken edited a comment on issue #9158: [FLINK-10052][coordination] 
Tolerate temporarily suspended ZooKeeper connections
URL: https://github.com/apache/flink/pull/9158#issuecomment-534131656
 
 
   > I'd prefer to split the version bump and the functional change into two 
PRs @lamber-ken.
   
   Hi, trill. If so, I'll create a new PR that bump the curator version.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9749: [FLINK-14115][docs-zh] Translate DataStream Code Walkthrough to Chinese

2019-09-23 Thread GitBox
flinkbot commented on issue #9749: [FLINK-14115][docs-zh] Translate DataStream 
Code Walkthrough to Chinese
URL: https://github.com/apache/flink/pull/9749#issuecomment-534149758
 
 
   
   ## CI report:
   
   * 141eb6db531af4b12a9506489f280f180c02a408 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14166) Reuse cache from previous history server run

2019-09-23 Thread David Moravek (Jira)
David Moravek created FLINK-14166:
-

 Summary: Reuse cache from previous history server run
 Key: FLINK-14166
 URL: https://issues.apache.org/jira/browse/FLINK-14166
 Project: Flink
  Issue Type: Improvement
Reporter: David Moravek


Currently history server is not able to reuse cache from previous run, even 
when `historyserver.web.tmpdir` is set. It could simply "warm up" cached job 
ids set, from previously parsed jobs.

https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java#L129

This should be configurable, so it does not break backward compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] sunhaibotb edited a comment on issue #9744: [FLINK-13515][test] Fix ClassLoaderITCase fails on Java 11

2019-09-23 Thread GitBox
sunhaibotb edited a comment on issue #9744: [FLINK-13515][test] Fix 
ClassLoaderITCase fails on Java 11
URL: https://github.com/apache/flink/pull/9744#issuecomment-533994120
 
 
   Can you review this PR for me @tillrohrmann @zentol ?  Thanks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk opened a new pull request #9745: [FLINK-14070] [runtime] Use TimeUtils to parse duration configs in flink-runtime

2019-09-23 Thread GitBox
zhuzhurk opened a new pull request #9745: [FLINK-14070] [runtime] Use TimeUtils 
to parse duration configs in flink-runtime
URL: https://github.com/apache/flink/pull/9745
 
 
   ## What is the purpose of the change
   
   *TimeUtils now can parse all duration configs supported by scala 
FiniteDuration.
   And we'd like to use it to replace scala Duration for duration config 
parsing.*
   *This is one step to make flink-runtime scala free.*
   
   
   ## Brief change log
   
 - *The duration configs in flink-runtime are now parsed using 
TimeUtils#parseDuration*
   
   ## Verifying this change
   
   This change is already covered by existing tests of changed components.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces 
PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#issuecomment-532703153
 
 
   
   ## CI report:
   
   * 4709beb04e69d94956ce9a823553efd67fefb29a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128184048)
   * 44134abf042fa3c3b8616cf9247bcf4263d78eea : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128606953)
   * 136d2a82e8c4de371a8786b17b9207b804427beb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128690948)
   * d5b73b9f994ed4ce9edb05a0cacecd2a64de1463 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128696978)
   * a81141edfc4ffaf0bf2b88515356de44902481fc : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128703229)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-09-23 Thread Bhagavan (Jira)
Bhagavan created FLINK-14170:


 Summary: Support hadoop < 2.7 with 
StreamingFileSink.BulkFormatBuilder
 Key: FLINK-14170
 URL: https://issues.apache.org/jira/browse/FLINK-14170
 Project: Flink
  Issue Type: Improvement
  Components: API / DataSet
Affects Versions: 1.9.0, 1.8.2, 1.8.1, 1.8.0
Reporter: Bhagavan


Currently, StreamingFileSink is supported only with Hadoop >= 2.7 irrespective 
of Row/bulk format builder. This restriction is due to truncate is not 
supported in  Hadoop < 2.7

However, BulkFormatBuilder does not use truncate method to restore the file. So 
the restricting StreamingFileSink.BulkFormatBuilder to be used only with Hadoop 
>= 2.7 is not necessary.

So requested improvement is to remove the precondition on 
HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327055566
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+class DeploymentHandle {
+
+   private final ExecutionVertexVersion requiredVertexVersion;
+
+   private final ExecutionVertexDeploymentOption 
executionVertexDeploymentOption;
+
+   private final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment;
+
+   public DeploymentHandle(
+   final ExecutionVertexVersion requiredVertexVersion,
 
 Review comment:
   See https://github.com/apache/flink/pull/9663/files#r326598494


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327057723
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 

[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327057591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+   public static ExecutionVertexSchedulingRequirements from(final 
ExecutionVertex executionVertex) {
+
+   final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(
 
 Review comment:
   Fixed here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062898
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -225,46 +178,56 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
numBuffersOut = metrics.getNumBuffersOutCounter();
}
 
-   /**
-* Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
-*/
-   private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (!bufferBuilders[targetChannel].isPresent()) {
-   return;
-   }
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
-   bufferBuilders[targetChannel] = Optional.empty();
+   protected void finishBufferBuilder(BufferBuilder bufferBuilder) {
numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
}
 
+   /**
+* This is used to send regular records.
+*/
+   public abstract void emit(T record) throws IOException, 
InterruptedException;
+
+   /**
+* This is used to send LatencyMarks to a random target channel.
+*/
+   public abstract void randomEmit(T record) throws IOException, 
InterruptedException;
+
+   /**
+* This is used to broadcast streaming Watermarks in-band with records.
+*/
+   public abstract void broadcastEmit(T record) throws IOException, 
InterruptedException;
+
/**
 * The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
 * request a new one for this target channel.
 */
-   private BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   return bufferBuilders[targetChannel].get();
-   } else {
-   return requestNewBufferBuilder(targetChannel);
-   }
-   }
+   abstract BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException;
 
-   private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent() || 
bufferBuilders[targetChannel].get().isFinished());
+   /**
+* Requests a new {@link BufferBuilder} for the target channel and 
returns it.
+*/
+   abstract BufferBuilder requestNewBufferBuilder(int targetChannel) 
throws IOException, InterruptedException;
 
-   BufferBuilder bufferBuilder = 
targetPartition.getBufferBuilder();
-   bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
-   
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
-   return bufferBuilder;
-   }
+   /**
+* Marks the current {@link BufferBuilder} as finished if present and 
clears the state for next one.
+*/
+   abstract void tryFinishCurrentBufferBuilder(int targetChannel);
 
-   private void closeBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   bufferBuilders[targetChannel].get().finish();
-   bufferBuilders[targetChannel] = Optional.empty();
-   }
-   }
+   /**
+* Marks the current {@link BufferBuilder} as empty for the target 
channel.
+*/
+   abstract void emptyCurrentBufferBuilder(int targetChannel);
+
+   /**
+* Marks the current {@link BufferBuilder} as finished and empty for 
the target channel.
 
 Review comment:
   suggestion: Marks the current {@link BufferBuilder} as finished and releases 
the resources for the target channel.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9744: [FLINK-13515][test] Fix ClassLoaderITCase fails on Java 11

2019-09-23 Thread GitBox
flinkbot commented on issue #9744: [FLINK-13515][test] Fix ClassLoaderITCase 
fails on Java 11
URL: https://github.com/apache/flink/pull/9744#issuecomment-533988291
 
 
   
   ## CI report:
   
   * 8210403a33cf3a6908980378fae9ee8d567afdb1 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9743: [hotfix][coordination] Wait for completion of MiniCluster#terminateExecutors

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9743: [hotfix][coordination] Wait for 
completion of MiniCluster#terminateExecutors
URL: https://github.com/apache/flink/pull/9743#issuecomment-533978420
 
 
   
   ## CI report:
   
   * 0ff3661a4946c47acc4ea362a869b9885e5712b6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128695210)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure CheckpointException can always be deserialized on JobManager

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9742: [FLINK-14076] Ensure 
CheckpointException can always be deserialized on JobManager
URL: https://github.com/apache/flink/pull/9742#issuecomment-533957111
 
 
   
   ## CI report:
   
   * b1c025876fff8d6c7807e55613aabefef891ed59 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687104)
   * 0b7c90578d0745548eba38593d4f320a7f33669e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128687968)
   * 2def508db5b57c2a387e395ccf605a2af4c886d1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128688964)
   * 2844e47e44e5ab556c3848bf1156a0d02032ac09 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128695177)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-23 Thread GitBox
gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) 
returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-53399
 
 
   If no objections I will merge this later today :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9744: [FLINK-13515][test] Fix ClassLoaderITCase fails on Java 11

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9744: [FLINK-13515][test] Fix 
ClassLoaderITCase fails on Java 11
URL: https://github.com/apache/flink/pull/9744#issuecomment-533988291
 
 
   
   ## CI report:
   
   * 8210403a33cf3a6908980378fae9ee8d567afdb1 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128698798)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14167) Move python-related scripts in flink-dist to flink-python

2019-09-23 Thread Wei Zhong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935625#comment-16935625
 ] 

Wei Zhong commented on FLINK-14167:
---

I'd be glad to take this Jira and create PR for it. ;) [~hequn8128]

> Move python-related scripts in flink-dist to flink-python
> -
>
> Key: FLINK-14167
> URL: https://issues.apache.org/jira/browse/FLINK-14167
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Wei Zhong
>Priority: Minor
>
> Currently, the scripts "pyflink-gateway-server.sh", "pyflink-shell.sh" and 
> "pyflink-udf-runner.sh" are stored in the flink-dist module. Now the module 
> flink-scala-shell and flink-sql-client store their scripts in their own 
> module directory instead of flink-dist. It would be better if we move the 
> flink-python related scripts from flink-dist to flink-python.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14167) Move python-related scripts in flink-dist to flink-python

2019-09-23 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-14167:
---

Assignee: Wei Zhong

> Move python-related scripts in flink-dist to flink-python
> -
>
> Key: FLINK-14167
> URL: https://issues.apache.org/jira/browse/FLINK-14167
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Wei Zhong
>Assignee: Wei Zhong
>Priority: Minor
>
> Currently, the scripts "pyflink-gateway-server.sh", "pyflink-shell.sh" and 
> "pyflink-udf-runner.sh" are stored in the flink-dist module. Now the module 
> flink-scala-shell and flink-sql-client store their scripts in their own 
> module directory instead of flink-dist. It would be better if we move the 
> flink-python related scripts from flink-dist to flink-python.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #9746: [FLINK-14167][python] Move python-related scripts in flink-dist to flink-python.

2019-09-23 Thread GitBox
flinkbot commented on issue #9746: [FLINK-14167][python] Move python-related 
scripts in flink-dist to flink-python.
URL: https://github.com/apache/flink/pull/9746#issuecomment-534027994
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit dd8bee73f70747407778136f85914b5d21001d1f (Mon Sep 23 
09:40:01 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14167) Move python-related scripts in flink-dist to flink-python

2019-09-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14167:
---
Labels: pull-request-available  (was: )

> Move python-related scripts in flink-dist to flink-python
> -
>
> Key: FLINK-14167
> URL: https://issues.apache.org/jira/browse/FLINK-14167
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Wei Zhong
>Assignee: Wei Zhong
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the scripts "pyflink-gateway-server.sh", "pyflink-shell.sh" and 
> "pyflink-udf-runner.sh" are stored in the flink-dist module. Now the module 
> flink-scala-shell and flink-sql-client store their scripts in their own 
> module directory instead of flink-dist. It would be better if we move the 
> flink-python related scripts from flink-dist to flink-python.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery

2019-09-23 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935691#comment-16935691
 ] 

Zhu Zhu commented on FLINK-14164:
-

[~wind_ljy] Thanks for offering to do this. I think this requires some 
knowledge on the scheduling and failover implementations. You can take it if 
you are prepared.

numberOfFailures/numberOfRestarts are the names I've come up with but not yet 
decided which one is better.
And the metric is meant to show the count of failovers that happened, which 
indicates issues happening.
However, failed tasks count can be useful to show the impact of failovers, so 
maybe we can also have it as numberOfTasksRestarted.

[~trohrmann] [~gjy] what's your opinion?

> Add a metric to show failover count regarding fine grained recovery
> ---
>
> Key: FLINK-14164
> URL: https://issues.apache.org/jira/browse/FLINK-14164
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Previously Flink uses restart all strategy to recover jobs from failures. And 
> the metric "fullRestart" is used to show the count of failovers.
> However, with fine grained recovery introduced in 1.9.0, the "fullRestart" 
> metric only reveals how many times the entire graph has been restarted, not 
> including the number of fine grained failure recoveries.
> As many users want to build their job alerting based on failovers, I'd 
> propose to add such a new metric {{numberOfFailures}}/{{numberOfRestarts}} 
> which also respects fine grained recoveries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14168) Remove unused BootstrapTools#generateTaskManagerConfiguration

2019-09-23 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14168:
---

 Summary: Remove unused 
BootstrapTools#generateTaskManagerConfiguration
 Key: FLINK-14168
 URL: https://issues.apache.org/jira/browse/FLINK-14168
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


{{BootstrapTools#generateTaskManagerConfiguration}} is not used anymore while 
it adds {{scala.concurrent.duration.FiniteDuration}} dependency to 
{{BootstrapTools}}.
I think we can remove it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14098) Support multiple statements for TableEnvironment

2019-09-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935715#comment-16935715
 ] 

Jark Wu commented on FLINK-14098:
-

Yes. You are right. [~felixzheng]

> Support multiple statements for TableEnvironment
> 
>
> Key: FLINK-14098
> URL: https://issues.apache.org/jira/browse/FLINK-14098
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Canbin Zheng
>Priority: Major
>
> Currently TableEnvironment.sqlUpdate supports single sql statement parsing by 
> invoking SqlParser.parseStmt.
> Actually, after the work of 
> [CALCITE-2453|https://issues.apache.org/jira/browse/CALCITE-2453], Calcite’s 
> SqlParser is able to parse multiple sql statements split by semicolon, IMO, 
> it’s useful to refactor TableEnvironment.sqlUpdate to support multiple sql 
> statements too, by invoking SqlParser.parseStmtList instead.
> I am not sure whether this is a duplicated ticket, if it is, let me know, 
> thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9746: [FLINK-14167][python] Move python-related scripts in flink-dist to flink-python.

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9746: [FLINK-14167][python] Move 
python-related scripts in flink-dist to flink-python.
URL: https://github.com/apache/flink/pull/9746#issuecomment-534031921
 
 
   
   ## CI report:
   
   * dd8bee73f70747407778136f85914b5d21001d1f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128717547)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062516
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A regular record-oriented runtime result writer.
+ *
+ * The ChannelSelectorRecordWriter extends the {@link RecordWriter} and 
maintains an array of
+ * {@link BufferBuilder}s for all the channels. The {@link 
#emit(IOReadableWritable)}
+ * operation is based on {@link ChannelSelector} to select the target channel.
+ *
+ * @param  the type of the record that can be emitted with this record 
writer
+ */
+public class ChannelSelectorRecordWriter extends 
RecordWriter {
 
 Review comment:
   final for JIT optimization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062444
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -18,30 +18,163 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
 import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A special record-oriented runtime result writer only for broadcast mode.
  *
- * The BroadcastRecordWriter extends the {@link RecordWriter} and handles 
{@link #emit(IOReadableWritable)}
- * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more 
efficient way.
+ * The BroadcastRecordWriter extends the {@link RecordWriter} and maintain 
a single {@link BufferBuilder}
+ * for all the channels. Then the serialization results need be copied only 
once to this buffer which would be
+ * shared for all the channels in a more efficient way.
  *
  * @param  the type of the record that can be emitted with this record 
writer
  */
 public class BroadcastRecordWriter extends 
RecordWriter {
 
 Review comment:
   final for JIT optimization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062031
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -54,23 +54,17 @@
  *
  * @param  the type of the record that can be emitted with this record 
writer
  */
-public class RecordWriter {
+public abstract class RecordWriter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(RecordWriter.class);
 
-   private final ResultPartitionWriter targetPartition;
+   protected final ResultPartitionWriter targetPartition;
 
-   private final ChannelSelector channelSelector;
+   protected final int numberOfChannels;
 
 Review comment:
   any reason to switch the order of fields here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces 
PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#issuecomment-532703153
 
 
   
   ## CI report:
   
   * 4709beb04e69d94956ce9a823553efd67fefb29a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128184048)
   * 44134abf042fa3c3b8616cf9247bcf4263d78eea : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128606953)
   * 136d2a82e8c4de371a8786b17b9207b804427beb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128690948)
   * d5b73b9f994ed4ce9edb05a0cacecd2a64de1463 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128696978)
   * a81141edfc4ffaf0bf2b88515356de44902481fc : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9745: [FLINK-14070] [runtime] Use TimeUtils to parse duration configs in flink-runtime

2019-09-23 Thread GitBox
flinkbot commented on issue #9745: [FLINK-14070] [runtime] Use TimeUtils to 
parse duration configs in flink-runtime
URL: https://github.com/apache/flink/pull/9745#issuecomment-534004353
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit d4ab076561b7d4d88347d55a1764d2a6d57507c3 (Mon Sep 23 
08:27:05 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9745: [FLINK-14070] [runtime] Use TimeUtils to parse duration configs in flink-runtime

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9745: [FLINK-14070] [runtime] Use TimeUtils 
to parse duration configs in flink-runtime
URL: https://github.com/apache/flink/pull/9745#issuecomment-534005398
 
 
   
   ## CI report:
   
   * d4ab076561b7d4d88347d55a1764d2a6d57507c3 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128705798)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces 
PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#issuecomment-532703153
 
 
   
   ## CI report:
   
   * 4709beb04e69d94956ce9a823553efd67fefb29a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128184048)
   * 44134abf042fa3c3b8616cf9247bcf4263d78eea : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128606953)
   * 136d2a82e8c4de371a8786b17b9207b804427beb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128690948)
   * d5b73b9f994ed4ce9edb05a0cacecd2a64de1463 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128696978)
   * a81141edfc4ffaf0bf2b88515356de44902481fc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128703229)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13734) Support DDL in SQL CLI

2019-09-23 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935694#comment-16935694
 ] 

Canbin Zheng commented on FLINK-13734:
--

+1 for this feature, seems to be a subtask of 
[FLIP-69|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html]

> Support DDL in SQL CLI
> --
>
> Key: FLINK-13734
> URL: https://issues.apache.org/jira/browse/FLINK-13734
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Priority: Major
>
> We have supported DDL in TableEnvironment. We should also support to execute 
> DDL on SQL client to make the feature to be used more easily. However, this 
> might need to modify the current architecture of SQL Client. More detailed 
> design should be attached and discussed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13864) StreamingFileSink: Allow inherited classes to extend StreamingFileSink correctly

2019-09-23 Thread Kostas Kloudas (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-13864.
--
Fix Version/s: 1.10.0
 Assignee: Ying Xu
   Resolution: Fixed

Merged on master with adfe011bc6fed36e30b3078bd3b6dbc0953f2ddf

> StreamingFileSink: Allow inherited classes to extend StreamingFileSink 
> correctly
> 
>
> Key: FLINK-13864
> URL: https://issues.apache.org/jira/browse/FLINK-13864
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Ying Xu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink can't be extended correctly as there are a 
> few issues [PR |[https://github.com/apache/flink/pull/8469]] merged for  this 
> [Jira|https://issues.apache.org/jira/browse/FLINK-12539]
> Mailing list discussion: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCACGLQUAxXjr2mBOf-6hbXcwmWoH5ib_0YEy-Vyjj%3DEPyQ25Qiw%40mail.gmail.com%3E]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] kl0u closed pull request #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-23 Thread GitBox
kl0u closed pull request #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-23 Thread GitBox
kl0u commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-534029845
 
 
   Merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-playgrounds] asfgit closed pull request #4: [FLINK-14160] Add --backpressure option to the ops playground

2019-09-23 Thread GitBox
asfgit closed pull request #4: [FLINK-14160] Add --backpressure option to the 
ops playground
URL: https://github.com/apache/flink-playgrounds/pull/4
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14169) Cleanup expired jobs from history server

2019-09-23 Thread David Moravek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935722#comment-16935722
 ] 

David Moravek commented on FLINK-14169:
---

Can you please assign this to [~david.hrbacek]? We have this planned for the 
current sprint. Also, should this behavior be configurable, so it's consistent 
with prior versions?

Thanks,
D.

> Cleanup expired jobs from history server
> 
>
> Key: FLINK-14169
> URL: https://issues.apache.org/jira/browse/FLINK-14169
> Project: Flink
>  Issue Type: Improvement
>Reporter: David Moravek
>Priority: Minor
>
> Cleanup jobs, that are no longer in history refresh locations during 
> JobArchiveFetcher::run.
> https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java#L138



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] aljoscha commented on issue #9720: [FLINK-13025] Elasticsearch 7.x support

2019-09-23 Thread GitBox
aljoscha commented on issue #9720: [FLINK-13025] Elasticsearch 7.x support
URL: https://github.com/apache/flink/pull/9720#issuecomment-534041797
 
 
   I had a cursory glance at this. The code seems fine, because it's basically 
the ES 6 code. I have some general comments:
   
   - I don't think the ES connectors (both 6 and 7) need the Table Planner 
dependency anymore. Maybe @twalthr can confirm this, but you can also try 
removing them and seeing what happens.
   - I don't think we need Table-specific code in the connectors when there is 
the `flink-sql-connector-elasticsearch7` module. Maybe the Table-related code 
can be moved to that module, along with the tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski merged pull request #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext

2019-09-23 Thread GitBox
pnowojski merged pull request #9478: [FLINK-13766][task] Refactor the 
implementation of StreamInputProcessor based on StreamTaskInput#emitNext
URL: https://github.com/apache/flink/pull/9478
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext

2019-09-23 Thread GitBox
pnowojski commented on issue #9478: [FLINK-13766][task] Refactor the 
implementation of StreamInputProcessor based on StreamTaskInput#emitNext
URL: https://github.com/apache/flink/pull/9478#issuecomment-534046669
 
 
   Benchmark request shown some potential regressions in pure network 
benchmarks, and only in some of them, not all, so that might be some random 
fluke. Especially that this PR barely touches anything used in those pure 
network benchmarks.
   
   Let's merge it either way and observe the long term trends. I would wait 
couple of days before merging ( https://github.com/apache/flink/pull/9483 )


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9707: [FLINK-14015][python] Introduces 
PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#issuecomment-532703153
 
 
   
   ## CI report:
   
   * 4709beb04e69d94956ce9a823553efd67fefb29a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128184048)
   * 44134abf042fa3c3b8616cf9247bcf4263d78eea : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128606953)
   * 136d2a82e8c4de371a8786b17b9207b804427beb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128690948)
   * d5b73b9f994ed4ce9edb05a0cacecd2a64de1463 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128696978)
   * a81141edfc4ffaf0bf2b88515356de44902481fc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128703229)
   * 82db453b92d4109358c778318e277b1c425a3d56 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326977860
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+   public static ExecutionVertexSchedulingRequirements from(final 
ExecutionVertex executionVertex) {
+
+   final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(
+   executionVertex.getJobVertex().getJobVertexId(),
+   executionVertex.getParallelSubtaskIndex());
+
+   final AllocationID latestPriorAllocation = 
executionVertex.getLatestPriorAllocation();
+   final SlotSharingGroup slotSharingGroup = 
executionVertex.getJobVertex().getSlotSharingGroup();
+
+   return new ExecutionVertexSchedulingRequirements.Builder()
+   .withExecutionVertexId(executionVertexId)
+   .withPreviousAllocationId(latestPriorAllocation)
+   .withSlotSharingGroupId(slotSharingGroup == null ? null 
: slotSharingGroup.getSlotSharingGroupId())
+   
.withCoLocationConstraint(executionVertex.getLocationConstraint())
+   
.withPreferredLocations(getPreferredLocationBasedOnState(executionVertex)).build();
 
 Review comment:
   I see. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326982305
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+   this.slotRequestTimeout = slotRequestTimeout;
+   this.slotProvider = slotProvider;
+   this.delayExecutor = delayExecutor;
+   this.userCodeLoader = userCodeLoader;
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = executionVertexVersioner;
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
 
 Review comment:
   Exactly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9745: [FLINK-14070] [runtime] Use TimeUtils to parse duration configs in flink-runtime

2019-09-23 Thread GitBox
flinkbot commented on issue #9745: [FLINK-14070] [runtime] Use TimeUtils to 
parse duration configs in flink-runtime
URL: https://github.com/apache/flink/pull/9745#issuecomment-534005398
 
 
   
   ## CI report:
   
   * d4ab076561b7d4d88347d55a1764d2a6d57507c3 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9744: [FLINK-13515][test] Fix ClassLoaderITCase fails on Java 11

2019-09-23 Thread GitBox
flinkbot edited a comment on issue #9744: [FLINK-13515][test] Fix 
ClassLoaderITCase fails on Java 11
URL: https://github.com/apache/flink/pull/9744#issuecomment-533988291
 
 
   
   ## CI report:
   
   * 8210403a33cf3a6908980378fae9ee8d567afdb1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128698798)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326999686
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable 
cause, ExecutionAttemptID fai
 * @param t The exception that caused the failure.
 */
public void failGlobal(Throwable t) {
+   if (!isLegacyScheduling()) {
+   ExceptionUtils.rethrow(t);
 
 Review comment:
   You are right, this deserves more attention.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 opened a new pull request #9746: [FLINK-14167][python] Move python-related scripts in flink-dist to flink-python.

2019-09-23 Thread GitBox
WeiZhong94 opened a new pull request #9746: [FLINK-14167][python] Move 
python-related scripts in flink-dist to flink-python.
URL: https://github.com/apache/flink/pull/9746
 
 
   ## What is the purpose of the change
   
   *This pull request moves python-related scripts in flink-dist to 
flink-python.*
   
   
   ## Brief change log
   
 - *moves python-related scripts in flink-dist to flink-python*
 - *modify the configuration in `flink-dist/src/main/assemblies/bin.xml` to 
copy python scripts in `flink-python/bin` to `build-target/bin`.*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14098) Support multiple statements for TableEnvironment

2019-09-23 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-14098:
-
Summary: Support multiple statements for TableEnvironment  (was: Support 
multiple statements splitting for TableEnvironment)

> Support multiple statements for TableEnvironment
> 
>
> Key: FLINK-14098
> URL: https://issues.apache.org/jira/browse/FLINK-14098
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Canbin Zheng
>Priority: Major
>
> Currently TableEnvironment.sqlUpdate supports single sql statement parsing by 
> invoking SqlParser.parseStmt.
> Actually, after the work of 
> [CALCITE-2453|https://issues.apache.org/jira/browse/CALCITE-2453], Calcite’s 
> SqlParser is able to parse multiple sql statements split by semicolon, IMO, 
> it’s useful to refactor TableEnvironment.sqlUpdate to support multiple sql 
> statements too, by invoking SqlParser.parseStmtList instead.
> I am not sure whether this is a duplicated ticket, if it is, let me know, 
> thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14098) Support multiple statements splitting for TableEnvironment

2019-09-23 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-14098:
-
Summary: Support multiple statements splitting for TableEnvironment  (was: 
Support multiple sql statements splitting by semicolon for TableEnvironment)

> Support multiple statements splitting for TableEnvironment
> --
>
> Key: FLINK-14098
> URL: https://issues.apache.org/jira/browse/FLINK-14098
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Canbin Zheng
>Priority: Major
>
> Currently TableEnvironment.sqlUpdate supports single sql statement parsing by 
> invoking SqlParser.parseStmt.
> Actually, after the work of 
> [CALCITE-2453|https://issues.apache.org/jira/browse/CALCITE-2453], Calcite’s 
> SqlParser is able to parse multiple sql statements split by semicolon, IMO, 
> it’s useful to refactor TableEnvironment.sqlUpdate to support multiple sql 
> statements too, by invoking SqlParser.parseStmtList instead.
> I am not sure whether this is a duplicated ticket, if it is, let me know, 
> thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14118) Reduce the unnecessary flushing when there is no data available for flush

2019-09-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935713#comment-16935713
 ] 

Piotr Nowojski edited comment on FLINK-14118 at 9/23/19 10:06 AM:
--

Yes, I would expect that this scenario that you have reported could affect 
performance. Probably those steps would be enough to reproduce:
* 1000 output channels
* 1ms flushing interval
* 99.9% or 99.99% records go through only a couple (1? 5? 10?) of output 
channels

We might also think about a different solution to this problem. Instead of 
adding an extra synchronisation point (which even if doesn't affect performance 
it complicates threads interactions), we could move the output flushing logic 
to the mailbox thread. Since all records writing happens from that thread, we 
could maintain something like non synchronized {{private boolean 
hasBeenWrittenTo}} flag, set to true on every record write to a 
channel/subpartition and set to false on every flush.

I haven't thought this through, my main concerns here would be how to implement 
efficiently scheduling a "flushAll" mailbox action. However I did a very quick 
PoC (https://github.com/pnowojski/flink/commits/mailbox-output-flusher) and 
benchmarked it with some of the network benchmarks and even the simplest 
solution seems to be ok.

Alternative solution might be to revive the old idea of moving output flusher 
to netty threads, ( https://issues.apache.org/jira/browse/FLINK-8625 ) which 
has a great performance improvement potential, however it has it's own 
unresolved issues ( 
https://github.com/apache/flink/pull/6698#discussion_r223309406 ) and might not 
solve this particular case anyway.


was (Author: pnowojski):
Yes, I would expect that this scenario that you have reported could affect 
performance. Probably those steps would be enough to reproduce:
* 1000 output channels
* 1ms flushing interval
* 99.9% or 99.99% records go through only a couple (1? 5? 10?) of output 
channels

We might also think about a different solution to this problem. Instead of 
adding an extra synchronisation point (which even if doesn't affect performance 
it complicates threads interactions), we could move the output flushing logic 
to the mailbox thread. Since all records writing happens from that thread, we 
could maintain something like non synchronized {{private boolean 
hasBeenWrittenTo}} flag, set to true on every record write to a 
channel/subpartition and set to false on every flush.

I haven't thought this through, my main concerns here would be how to implement 
efficiently scheduling a "flushAll" mailbox action. However I did a very quick 
PoC (https://github.com/pnowojski/flink/commits/mailbox-output-flusher) and 
benchmarked it with some of the network benchmarks and even the simplest 
solution seems to be ok.

> Reduce the unnecessary flushing when there is no data available for flush
> -
>
> Key: FLINK-14118
> URL: https://issues.apache.org/jira/browse/FLINK-14118
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.1, 1.8.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new flush implementation which works by triggering a netty user event may 
> cause performance regression compared to the old synchronization-based one. 
> More specifically, when there is exactly one BufferConsumer in the buffer 
> queue of subpartition and no new data will be added for a while in the future 
> (may because of just no input or the logic of the operator is to collect some 
> data for processing and will not emit records immediately), that is, there is 
> no data to send, the OutputFlusher will continuously notify data available 
> and wake up the netty thread, though no data will be returned by the 
> pollBuffer method.
> For some of our production jobs, this will incur 20% to 40% CPU overhead 
> compared to the old implementation. We tried to fix the problem by checking 
> if there is new data available when flushing, if there is no new data, the 
> netty thread will not be notified. It works for our jobs and the cpu usage 
> falls to previous level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327059906
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -18,30 +18,163 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
 import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A special record-oriented runtime result writer only for broadcast mode.
  *
- * The BroadcastRecordWriter extends the {@link RecordWriter} and handles 
{@link #emit(IOReadableWritable)}
- * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more 
efficient way.
+ * The BroadcastRecordWriter extends the {@link RecordWriter} and maintain 
a single {@link BufferBuilder}
+ * for all the channels. Then the serialization results need be copied only 
once to this buffer which would be
+ * shared for all the channels in a more efficient way.
  *
  * @param  the type of the record that can be emitted with this record 
writer
  */
 public class BroadcastRecordWriter extends 
RecordWriter {
 
+   /** The current buffer builder shared for all the channels. */
+   private Optional bufferBuilder = Optional.empty();
 
 Review comment:
   Optional fields are 
[discouraged](http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCAA_61Xo9oif8RjJgvxFNo%2Bua7_DoFksCy_5c_NOnQLWfS4-8qA%40mail.gmail.com%3E).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14165) HadoopRecoverableWriter do not support viewfs scheme

2019-09-23 Thread haoyuwen (Jira)
haoyuwen created FLINK-14165:


 Summary: HadoopRecoverableWriter do not support viewfs scheme
 Key: FLINK-14165
 URL: https://issues.apache.org/jira/browse/FLINK-14165
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.9.0
Reporter: haoyuwen


HadoopRecoverableWriter limits the scheme to hdfs and cannot use viewfs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   >