[GitHub] Guibo-Pan commented on issue #6773: Add reverse function in Table API and SQL

2018-09-27 Thread GitBox
Guibo-Pan commented on issue #6773: Add reverse function in Table API and SQL
URL: https://github.com/apache/flink/pull/6773#issuecomment-425318708
 
 
   cc @xccui @yanghua 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631324#comment-16631324
 ] 

ASF GitHub Bot commented on FLINK-10340:


xccui commented on a change in pull request #6700: [FLINK-10340][table] Add 
Cosh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#discussion_r221136490
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2)
   
 
 
+
+  
+{% highlight text %}
+COSH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cosine of numeric. Return value type 
is DOUBLE.
 
 Review comment:
   1. This page is not a pure markdown doc. We cannot use backquote here.
   2. Conventionally, we use small letters for SQL attributes and capital 
letters for Java/Scala attributes.
   3. I don't think the return type should be marked like that. In most cases, 
it should be identical to the inputs' types (e.g., if the input is a 
`bigdecimal`, we should also return a `begdecimal` instead of a fixed `double`).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Cosh math function supported in Table API and SQL
> -
>
> Key: FLINK-10340
> URL: https://issues.apache.org/jira/browse/FLINK-10340
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Sergey Tsvetkov
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Implement udf of cosh, just like in oracle
> [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL

2018-09-27 Thread GitBox
xccui commented on a change in pull request #6700: [FLINK-10340][table] Add 
Cosh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#discussion_r221136490
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2)
   
 
 
+
+  
+{% highlight text %}
+COSH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cosine of numeric. Return value type 
is DOUBLE.
 
 Review comment:
   1. This page is not a pure markdown doc. We cannot use backquote here.
   2. Conventionally, we use small letters for SQL attributes and capital 
letters for Java/Scala attributes.
   3. I don't think the return type should be marked like that. In most cases, 
it should be identical to the inputs' types (e.g., if the input is a 
`bigdecimal`, we should also return a `begdecimal` instead of a fixed `double`).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631322#comment-16631322
 ] 

ASF GitHub Bot commented on FLINK-10340:


xccui commented on a change in pull request #6700: [FLINK-10340][table] Add 
Cosh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#discussion_r221136490
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2)
   
 
 
+
+  
+{% highlight text %}
+COSH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cosine of numeric. Return value type 
is DOUBLE.
 
 Review comment:
   1. This page is not a pure markdown doc. We cannot use backquote here.
   2. Conventionally, we use small letters for SQL attributes and capital 
letters for Java/Scala attributes.
   3. I don't think the return type should be marked like that. In most cases, 
it should be identical with the input's types.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Cosh math function supported in Table API and SQL
> -
>
> Key: FLINK-10340
> URL: https://issues.apache.org/jira/browse/FLINK-10340
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Sergey Tsvetkov
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Implement udf of cosh, just like in oracle
> [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL

2018-09-27 Thread GitBox
xccui commented on a change in pull request #6700: [FLINK-10340][table] Add 
Cosh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#discussion_r221136490
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2)
   
 
 
+
+  
+{% highlight text %}
+COSH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cosine of numeric. Return value type 
is DOUBLE.
 
 Review comment:
   1. This page is not a pure markdown doc. We cannot use backquote here.
   2. Conventionally, we use small letters for SQL attributes and capital 
letters for Java/Scala attributes.
   3. I don't think the return type should be marked like that. In most cases, 
it should be identical with the input's types.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Guibo-Pan opened a new pull request #6773: Add reverse function in Table API and SQL

2018-09-27 Thread GitBox
Guibo-Pan opened a new pull request #6773: Add reverse function in Table API 
and SQL
URL: https://github.com/apache/flink/pull/6773
 
 
   ## What is the purpose of the change
   
   *This pull request add replace function in TableAPI and SQL*
   
   
   ## Brief change log
 - *Add reverse function in TableAPI and SQL*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*ScalarFunctionsTest#testReverse*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10456) Remove org.apache.flink.api.common.time.Deadline

2018-09-27 Thread tison (JIRA)
tison created FLINK-10456:
-

 Summary: Remove org.apache.flink.api.common.time.Deadline
 Key: FLINK-10456
 URL: https://issues.apache.org/jira/browse/FLINK-10456
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.7.0
Reporter: tison
Assignee: tison
 Fix For: 1.7.0


We already have {{scala.concurrent.duration.Deadline}}.

{{org.apache.flink.api.common.time.Deadline}} is not a rich extend of it. I 
suspect at which situation we need a customized Deadline. If not, introduce a 
weak alternation seems unreasonable and raise confusion.

What do you think? cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10420) Create and drop view in sql client should check the view created based on the configuration.

2018-09-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631287#comment-16631287
 ] 

vinoyang commented on FLINK-10420:
--

hi [~twalthr] I think the *CREATE OR REPLACE VIEW* syntax is a good solution to 
this problem. The user decides and understands his choice. What do you think?

In addition, I have another question, why do we use calcite to parse statements 
when sql-client uses DDL statements?

> Create and drop view in sql client should check the view created based on the 
> configuration.
> 
>
> Key: FLINK-10420
> URL: https://issues.apache.org/jira/browse/FLINK-10420
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Currently, just checked current session : 
> {code:java}
> private void callCreateView(SqlCommandCall cmdCall) {
>final String name = cmdCall.operands[0];
>final String query = cmdCall.operands[1];
>//here
>final String previousQuery = context.getViews().get(name);
>if (previousQuery != null) {
>   printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS);
>   return;
>}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10240) Pluggable scheduling strategy for batch jobs

2018-09-27 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10240:

Affects Version/s: 1.7.0

> Pluggable scheduling strategy for batch jobs
> 
>
> Key: FLINK-10240
> URL: https://issues.apache.org/jira/browse/FLINK-10240
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: scheduling
>
> Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source 
> tasks are scheduled in the beginning, and other tasks are scheduled once 
> there input data are consumable.
> However, input data consumable does not always mean the task can work at 
> once. 
>  
> One example is the hash join operation, where the operator first consumes one 
> side(we call it build side) to setup a table, then consumes the other side(we 
> call it probe side) to do the real join work. If the probe side is started 
> early, it just get stuck on back pressure as the join operator will not 
> consume data from it before the building stage is done, causing a waste of 
> resources.
> If we have the probe side task started after the build stage is done, both 
> the build and probe side can have more computing resources as they are 
> staggered.
>  
> That's why we think a flexible scheduling strategy is needed, allowing job 
> owners to customize the vertex schedule order and constraints. Better 
> resource utilization usually means better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4399) Add support for oversized messages

2018-09-27 Thread Leanken.Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631275#comment-16631275
 ] 

Leanken.Lin edited comment on FLINK-4399 at 9/28/18 2:26 AM:
-

Hi. [~till.rohrmann]

Do we have some update on this issue? Is anyone still working on this issue. 
I've see another PR [https://github.com/apache/flink/pull/887] related to this 
JIRA, but not merge yet.


was (Author: leanken):
Hi. [~till.rohrmann]

Do we have some update on this issue?

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: flip-6
> Fix For: 1.7.0
>
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631278#comment-16631278
 ] 

ASF GitHub Bot commented on FLINK-10398:


yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6736#issuecomment-425300319
 
 
   @pnowojski I also changed this PR based on your suggestion for the `cosh` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Tanh math function supported in Table API and SQL
> -
>
> Key: FLINK-10398
> URL: https://issues.apache.org/jira/browse/FLINK-10398
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to : https://www.techonthenet.com/oracle/functions/tanh.php



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL

2018-09-27 Thread GitBox
yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6736#issuecomment-425300319
 
 
   @pnowojski I also changed this PR based on your suggestion for the `cosh` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-4399) Add support for oversized messages

2018-09-27 Thread Leanken.Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631275#comment-16631275
 ] 

Leanken.Lin edited comment on FLINK-4399 at 9/28/18 2:21 AM:
-

Hi. [~till.rohrmann]

Do we have some update on this issue?


was (Author: leanken):
Hi. [~till.rohrmann]

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: flip-6
> Fix For: 1.7.0
>
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2018-09-27 Thread Leanken.Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631275#comment-16631275
 ] 

Leanken.Lin commented on FLINK-4399:


Hi. [~till.rohrmann]

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: flip-6
> Fix For: 1.7.0
>
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631252#comment-16631252
 ] 

ASF GitHub Bot commented on FLINK-10247:


Clark commented on issue #6759: [FLINK-10247][Metrics] Run 
MetricQueryService in a dedicated actor system
URL: https://github.com/apache/flink/pull/6759#issuecomment-425296165
 
 
   cc @TisonKun @yanghua 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Run MetricQueryService in separate thread pool
> --
>
> Key: FLINK-10247
> URL: https://issues.apache.org/jira/browse/FLINK-10247
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system

2018-09-27 Thread GitBox
Clark commented on issue #6759: [FLINK-10247][Metrics] Run 
MetricQueryService in a dedicated actor system
URL: https://github.com/apache/flink/pull/6759#issuecomment-425296165
 
 
   cc @TisonKun @yanghua 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-09-27 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631165#comment-16631165
 ] 

Bowen Li commented on FLINK-10168:
--

[~wind_ljy] are you planning to start working on this issue very soon? If not, 
I plan to take it over

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Jiayi Liao
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path. What 
> this means is that, currently the API is 
> {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path 
> filter. A more generic API that can take more filters can look like this 1) 
> {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... 
> ))}}
> 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} 
> exposes all file attributes that Flink's file system can provide, like path 
> and modified time
> I lean towards the 2nd option, because it gives users more flexibility to 
> define complex filtering rules based on combinations of file attributes.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631143#comment-16631143
 ] 

ASF GitHub Bot commented on FLINK-10289:


isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify 
Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739#issuecomment-425266762
 
 
   @StephanEwen , i have changed the implementation with java annotation, could 
you take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify 
Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739#issuecomment-425266762
 
 
   @StephanEwen , i have changed the implementation with java annotation, could 
you take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631036#comment-16631036
 ] 

ASF GitHub Bot commented on FLINK-10289:


yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221074188
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * given a exception do the classification.
+ */
+public class ThrowableClassifier {
 
 Review comment:
   i think we can explore interface/annotation approach


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221074188
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * given a exception do the classification.
+ */
+public class ThrowableClassifier {
 
 Review comment:
   i think we can explore interface/annotation approach


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631031#comment-16631031
 ] 

ASF GitHub Bot commented on FLINK-10289:


yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073415
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
+
+   /**
+* indicate data consumption error, that we should revoke the producer.
 
 Review comment:
   data consumption error, which indicates that ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631032#comment-16631032
 ] 

ASF GitHub Bot commented on FLINK-10289:


yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073431
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
+
+   /**
+* indicate data consumption error, that we should revoke the producer.
+* */
+   PartitionDataMissingError,
+
+   /**
+* this indicate errors such us Hardware error, service issue, that we 
should consider blacklist the machine.
 
 Review comment:
   this indicates error related to running environment, such as ..., in which 
case we should consider...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631033#comment-16631033
 ] 

ASF GitHub Bot commented on FLINK-10289:


yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073448
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
+
+   /**
+* indicate data consumption error, that we should revoke the producer.
+* */
+   PartitionDataMissingError,
+
+   /**
+* this indicate errors such us Hardware error, service issue, that we 
should consider blacklist the machine.
+* */
+   EnvironmentError,
+
+   /**
+* this indicate other errors that recoverable.
 
 Review comment:
   this indicates other error that is recoverable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631030#comment-16631030
 ] 

ASF GitHub Bot commented on FLINK-10289:


yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073399
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
 
 Review comment:
   NonRecoverableError
   
   to be more consistent with other types


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073399
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
 
 Review comment:
   NonRecoverableError
   
   to be more consistent with other types


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073448
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
+
+   /**
+* indicate data consumption error, that we should revoke the producer.
+* */
+   PartitionDataMissingError,
+
+   /**
+* this indicate errors such us Hardware error, service issue, that we 
should consider blacklist the machine.
+* */
+   EnvironmentError,
+
+   /**
+* this indicate other errors that recoverable.
 
 Review comment:
   this indicates other error that is recoverable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073415
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
+
+   /**
+* indicate data consumption error, that we should revoke the producer.
 
 Review comment:
   data consumption error, which indicates that ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073384
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
 
 Review comment:
   this indicates error that would not succeed even with retry, such as 
DivideZeroExeception. No failover should be attempted with such error. Instead, 
the job should fail immediately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631029#comment-16631029
 ] 

ASF GitHub Bot commented on FLINK-10289:


yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073384
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
 
 Review comment:
   this indicates error that would not succeed even with retry, such as 
DivideZeroExeception. No failover should be attempted with such error. Instead, 
the job should fail immediately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-27 Thread GitBox
yingdachen commented on a change in pull request #6739: [FLINK-10289] 
[JobManager] Classify Exceptions to different category for apply different 
failover strategy
URL: https://github.com/apache/flink/pull/6739#discussion_r221073431
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+   /**
+* the issue indicate it will not success even retry, such as a 
DivideZeroException.
+* for this kind of exception, we shouldn't consider failover, or we 
should fail the job
+*/
+   NonRecoverable,
+
+   /**
+* indicate data consumption error, that we should revoke the producer.
+* */
+   PartitionDataMissingError,
+
+   /**
+* this indicate errors such us Hardware error, service issue, that we 
should consider blacklist the machine.
 
 Review comment:
   this indicates error related to running environment, such as ..., in which 
case we should consider...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630888#comment-16630888
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221036186
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   Ok, I think there shouldn't be too much re-computation overhead, since it 
happens once per attempt. I am fine with either way. Thx.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-27 Thread GitBox
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221036186
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   Ok, I think there shouldn't be too much re-computation overhead, since it 
happens once per attempt. I am fine with either way. Thx.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630841#comment-16630841
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221024426
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   put null here is to avoid recomputing, otherwise if there is another 
execution attempt pull getNextSplit we need to recompute. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-27 Thread GitBox
isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221024426
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   put null here is to avoid recomputing, otherwise if there is another 
execution attempt pull getNextSplit we need to recompute. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10057) optimalize org.apache.flink.yarn.cli.FlinkYarnSessionCli.isYarnPropertiesFileMode

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630825#comment-16630825
 ] 

ASF GitHub Bot commented on FLINK-10057:


lzqdename commented on issue #6491: [FLINK-10057] Update 
FlinkYarnSessionCli.java
URL: https://github.com/apache/flink/pull/6491#issuecomment-425186485
 
 
   @TisonKun @StefanRRichter will this PR be merged into master edition?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> optimalize 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.isYarnPropertiesFileMode
> -
>
> Key: FLINK-10057
> URL: https://issues.apache.org/jira/browse/FLINK-10057
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.2
>Reporter: liuzq
>Priority: Major
>  Labels: pull-request-available
>
> --in function : 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.isYarnPropertiesFileMode
>  
> private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
>  boolean canApplyYarnProperties = 
> !commandLine.hasOption(addressOption.getOpt());
> *//if canApplyYarnProperties is false,return quickly !*
> *//this code is added by me*
>  *if(false==canApplyYarnProperties){*
>        *return canApplyYarnProperties;*
>  *}*
> for (Option option : commandLine.getOptions()) {
>  if (allOptions.hasOption(option.getOpt())) {
>  if (!isDetachedOption(option)) {
> ...
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lzqdename commented on issue #6491: [FLINK-10057] Update FlinkYarnSessionCli.java

2018-09-27 Thread GitBox
lzqdename commented on issue #6491: [FLINK-10057] Update 
FlinkYarnSessionCli.java
URL: https://github.com/apache/flink/pull/6491#issuecomment-425186485
 
 
   @TisonKun @StefanRRichter will this PR be merged into master edition?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630816#comment-16630816
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221019265
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}
 
+   public InputSplit getNextInputSplit() {
+   final LogicalSlot slot = this.getAssignedResource();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
 
 Review comment:
   this is actually a refactor of original code (see line 577 of 
JobMaster.java), we keep the logic same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-27 Thread GitBox
isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221019265
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}
 
+   public InputSplit getNextInputSplit() {
+   final LogicalSlot slot = this.getAssignedResource();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
 
 Review comment:
   this is actually a refactor of original code (see line 577 of 
JobMaster.java), we keep the logic same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630790#comment-16630790
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on issue #6711: [FLINK-9377] [core, state backends] 
Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#issuecomment-425180397
 
 
   @dawidwys and me had a long joint review / discussion session. A lot in this 
PR goes in a very good direction. We would suggest some not too large changes. 
Please let us know if you agree, we may have also overlooked or misinterpreted 
some implications.
   
   **Desired Goals**
   
 - Once this PR is merged, Flink should be in a releasable state, meaning 
that it should not matter if we manage to upgrade all serializers to the new 
model before the release. That is important for the time-based releasing and 
the uncertainty with respect to Scala code generation in serializers, etc. If 
we manage to upgrade half of Flink's serializers, it should be perfectly fine.
   
 - There are also users have written customer serializers and with that, 
custom config snapshots. We cannot expect that they update them strictly in 
sync with our release. It would be good for users to have an easy way to adjust 
their customer serializers' config snapshots such that they still work.
   
 - The current code follows the assumption that in order to be upgraded (1) 
serializers need to bump strictly one version and (2) on their backwards 
compatibility paths, read first the serializer (via Java serialization) and 
then the config snapshot data. Any serializer not following this strict 
contract will have errors during the reading of the config snapshot data. That 
seems a hard and fragile contract, especially considering that users also need 
to follow that path for upgrades.
   
   **Suggested Change**
   
 - We do not completely remove the Java Serialization for serializers, but 
make it optional. Config Snapshots can decide whether they want the prior 
serializer to be serialized into the meta info or not. This is similar to the 
current backwards compatibility path, but does not put the serialized 
serializer into the same byte stream and does not making the assumption that 
the specific value of the version tells you that.
   
 - We extend the `TypeSerializerConfigSnapshot` class as suggested below. 
This tells the config snapshot writer whether to write the previous serializer. 
It also removes the need for the "Backward Compatible Wrapper", because the 
TypeSerializerConfigSnapshot handles the bridging directly.
   
 - We introduce (possibly later PR) the 
`SerializerConfigSnapshotBackwardsAdapter` and the new 
`SerializerConfigSnapshot`. All updated config snapshots should extend the 
`SerializerConfigSnapshot` and the not updated ones should extend the 
`SerializerConfigSnapshotBackwardsAdapter`. We can then make the methods in 
TypeSerializerConfigSnapshot abstract.
   
 - Once we decide we have given enough of a grace period for users to 
update their serializers and config snapshots, we remove the optional 
serialization of the previous serializer, remove the old 
`TypeSerializerConfigSnapshot` class and only keep the new 
`SerializerConfigSnapshot` class.
   
 - This should follow the same spirit as this PR, but introduce fewer 
subtle contracts and decouple versioning from the removal of the Java 
serialization.
   
   ```java
   public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
   
   private TypeSerializerpriorSerializer;
   
   public boolean needsPriorSerializerPersisted() { return true; }
   
   public void setPriorSerializer(TypeSerializer prior) { 
this.priorSerializer = prior; }
   
   public TypeSerializergetPriorSerializer() { return 
this.priorSerializer; }
   
   public TypeSerializerrestoreSerializer() {
   if (priorSerializer != null) {
   return priorSerializer;
   } else {
   throw new IllegalStateException(...);
   }
   }
   
  // ... the current methods
   }
   
   public class SerializerConfigSnapshotBackwardsAdapter extends 
TypeSerializerConfigSnapshot{
   
  // same methods as above
   }
   
   public class SerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
   
   public final boolean needsPriorSerializerPersisted() { return false; }
   
   public final void setPriorSerializer(TypeSerializer prior) {}
   
   public final TypeSerializergetPriorSerializer() { throw new 
UnsupportedOperationException(); }
   
   // this strictly needs to be implemented
   public abstract TypeSerializerrestoreSerializer();
   }
   ```
   
   What do you think about that?
   


This is an automated message from the Apache Git Service.
To respond 

[GitHub] StephanEwen commented on issue #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-27 Thread GitBox
StephanEwen commented on issue #6711: [FLINK-9377] [core, state backends] 
Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#issuecomment-425180397
 
 
   @dawidwys and me had a long joint review / discussion session. A lot in this 
PR goes in a very good direction. We would suggest some not too large changes. 
Please let us know if you agree, we may have also overlooked or misinterpreted 
some implications.
   
   **Desired Goals**
   
 - Once this PR is merged, Flink should be in a releasable state, meaning 
that it should not matter if we manage to upgrade all serializers to the new 
model before the release. That is important for the time-based releasing and 
the uncertainty with respect to Scala code generation in serializers, etc. If 
we manage to upgrade half of Flink's serializers, it should be perfectly fine.
   
 - There are also users have written customer serializers and with that, 
custom config snapshots. We cannot expect that they update them strictly in 
sync with our release. It would be good for users to have an easy way to adjust 
their customer serializers' config snapshots such that they still work.
   
 - The current code follows the assumption that in order to be upgraded (1) 
serializers need to bump strictly one version and (2) on their backwards 
compatibility paths, read first the serializer (via Java serialization) and 
then the config snapshot data. Any serializer not following this strict 
contract will have errors during the reading of the config snapshot data. That 
seems a hard and fragile contract, especially considering that users also need 
to follow that path for upgrades.
   
   **Suggested Change**
   
 - We do not completely remove the Java Serialization for serializers, but 
make it optional. Config Snapshots can decide whether they want the prior 
serializer to be serialized into the meta info or not. This is similar to the 
current backwards compatibility path, but does not put the serialized 
serializer into the same byte stream and does not making the assumption that 
the specific value of the version tells you that.
   
 - We extend the `TypeSerializerConfigSnapshot` class as suggested below. 
This tells the config snapshot writer whether to write the previous serializer. 
It also removes the need for the "Backward Compatible Wrapper", because the 
TypeSerializerConfigSnapshot handles the bridging directly.
   
 - We introduce (possibly later PR) the 
`SerializerConfigSnapshotBackwardsAdapter` and the new 
`SerializerConfigSnapshot`. All updated config snapshots should extend the 
`SerializerConfigSnapshot` and the not updated ones should extend the 
`SerializerConfigSnapshotBackwardsAdapter`. We can then make the methods in 
TypeSerializerConfigSnapshot abstract.
   
 - Once we decide we have given enough of a grace period for users to 
update their serializers and config snapshots, we remove the optional 
serialization of the previous serializer, remove the old 
`TypeSerializerConfigSnapshot` class and only keep the new 
`SerializerConfigSnapshot` class.
   
 - This should follow the same spirit as this PR, but introduce fewer 
subtle contracts and decouple versioning from the removal of the Java 
serialization.
   
   ```java
   public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
   
   private TypeSerializerpriorSerializer;
   
   public boolean needsPriorSerializerPersisted() { return true; }
   
   public void setPriorSerializer(TypeSerializer prior) { 
this.priorSerializer = prior; }
   
   public TypeSerializergetPriorSerializer() { return 
this.priorSerializer; }
   
   public TypeSerializerrestoreSerializer() {
   if (priorSerializer != null) {
   return priorSerializer;
   } else {
   throw new IllegalStateException(...);
   }
   }
   
  // ... the current methods
   }
   
   public class SerializerConfigSnapshotBackwardsAdapter extends 
TypeSerializerConfigSnapshot{
   
  // same methods as above
   }
   
   public class SerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
   
   public final boolean needsPriorSerializerPersisted() { return false; }
   
   public final void setPriorSerializer(TypeSerializer prior) {}
   
   public final TypeSerializergetPriorSerializer() { throw new 
UnsupportedOperationException(); }
   
   // this strictly needs to be implemented
   public abstract TypeSerializerrestoreSerializer();
   }
   ```
   
   What do you think about that?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630761#comment-16630761
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221002876
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   nit: When all input splits are exhausted for given vertex (nextInputSplit is 
null), you can just return null without adding an extra null element at the end 
of array list.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630760#comment-16630760
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221007252
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -704,6 +716,66 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   private JobGraph createDataSourceJobGraph() {
+   final TextInputFormat inputFormat = new TextInputFormat(new 
Path("."));
+   final InputFormatVertex producer = new 
InputFormatVertex("Producer");
+   new TaskConfig(producer.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper>(inputFormat));
+   producer.setInvokableClass(DataSourceTask.class);
+
+   final JobVertex consumer = new JobVertex("Consumer");
+   consumer.setInvokableClass(NoOpInvokable.class);
+   consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+   final JobGraph jobGraph = new JobGraph(producer, consumer);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   return jobGraph;
+   }
+
+   /**
+* Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, 
ExecutionAttemptID)}
+* validate that it will get same result for a different retry
+*/
+   @Test
+   public void testRequestNextInputSplitWithDataSourceFailover() throws 
Exception {
+
+   final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+   testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, 
jobMaster) ->{
+   try{
+   final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final TaskInformation taskInformation = 
tdd.getSerializedTaskInformation()
+   
.deserializeValue(getClass().getClassLoader());
+   JobVertexID vertexID = 
taskInformation.getJobVertexId();
+
+   //get the previous split
+   SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+   //start a new version of this execution
+   ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
+   Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+   ExecutionVertex executionVertex = 
execution.getVertex();
+
+   long version = execution.getGlobalModVersion();
+   gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FINISHED)).get();
+   Execution newExecution = 
executionVertex.resetForNewExecution(System.currentTimeMillis(), version);
+
+   //get the new split
+   SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+   
Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
+
+   //get the new split3
 
 Review comment:
   Make sure you cover the case where input splits are exhausted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> 

[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630759#comment-16630759
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221001196
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}
 
+   public InputSplit getNextInputSplit() {
+   final LogicalSlot slot = this.getAssignedResource();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
 
 Review comment:
   Under which condition, slot will be null? If slot is null, what does 
vertex.getNextInputSplit() returns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-27 Thread GitBox
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221007252
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -704,6 +716,66 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   private JobGraph createDataSourceJobGraph() {
+   final TextInputFormat inputFormat = new TextInputFormat(new 
Path("."));
+   final InputFormatVertex producer = new 
InputFormatVertex("Producer");
+   new TaskConfig(producer.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper>(inputFormat));
+   producer.setInvokableClass(DataSourceTask.class);
+
+   final JobVertex consumer = new JobVertex("Consumer");
+   consumer.setInvokableClass(NoOpInvokable.class);
+   consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+   final JobGraph jobGraph = new JobGraph(producer, consumer);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   return jobGraph;
+   }
+
+   /**
+* Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, 
ExecutionAttemptID)}
+* validate that it will get same result for a different retry
+*/
+   @Test
+   public void testRequestNextInputSplitWithDataSourceFailover() throws 
Exception {
+
+   final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+   testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, 
jobMaster) ->{
+   try{
+   final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final TaskInformation taskInformation = 
tdd.getSerializedTaskInformation()
+   
.deserializeValue(getClass().getClassLoader());
+   JobVertexID vertexID = 
taskInformation.getJobVertexId();
+
+   //get the previous split
+   SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+   //start a new version of this execution
+   ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
+   Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+   ExecutionVertex executionVertex = 
execution.getVertex();
+
+   long version = execution.getGlobalModVersion();
+   gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FINISHED)).get();
+   Execution newExecution = 
executionVertex.resetForNewExecution(System.currentTimeMillis(), version);
+
+   //get the new split
+   SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+   
Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
+
+   //get the new split3
 
 Review comment:
   Make sure you cover the case where input splits are exhausted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-27 Thread GitBox
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221002876
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   nit: When all input splits are exhausted for given vertex (nextInputSplit is 
null), you can just return null without adding an extra null element at the end 
of array list.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-27 Thread GitBox
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221001196
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}
 
+   public InputSplit getNextInputSplit() {
+   final LogicalSlot slot = this.getAssignedResource();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
 
 Review comment:
   Under which condition, slot will be null? If slot is null, what does 
vertex.getNextInputSplit() returns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10397) Remove CoreOptions#MODE

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630712#comment-16630712
 ] 

ASF GitHub Bot commented on FLINK-10397:


tillrohrmann commented on issue #6752: [FLINK-10397] Remove CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752#issuecomment-425159955
 
 
   I agree with @TisonKun. Removing the `EXECUTION_MODE_OPTION` and `isNewMode` 
can be done in another subtask of FLINK-10392.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove CoreOptions#MODE
> ---
>
> Key: FLINK-10397
> URL: https://issues.apache.org/jira/browse/FLINK-10397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Remove the {{CoreOptions#MODE}} since it is no longer needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6752: [FLINK-10397] Remove CoreOptions#MODE

2018-09-27 Thread GitBox
tillrohrmann commented on issue #6752: [FLINK-10397] Remove CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752#issuecomment-425159955
 
 
   I agree with @TisonKun. Removing the `EXECUTION_MODE_OPTION` and `isNewMode` 
can be done in another subtask of FLINK-10392.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10399) Refractor ParameterTool#fromArgs

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630706#comment-16630706
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on issue #6737: [FLINK-10399] Refractor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-425159102
 
 
   ping @StephanEwen what is the following step? I am confused that it stalls.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refractor ParameterTool#fromArgs
> 
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs

2018-09-27 Thread GitBox
TisonKun commented on issue #6737: [FLINK-10399] Refractor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-425159102
 
 
   ping @StephanEwen what is the following step? I am confused that it stalls.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing

2018-09-27 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630660#comment-16630660
 ] 

Nico Kruber commented on FLINK-10419:
-

The initial error on the task managers leading to the error above apparently 
was the following, based on a too low Kafka brokers' timeout for the 
transactions to commit:

{code}
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:1258)
at java.util.concurrent.Executors.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
{code}

> ClassNotFoundException while deserializing user exceptions from checkpointing
> -
>
> Key: FLINK-10419
> URL: https://issues.apache.org/jira/browse/FLINK-10419
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> It seems that somewhere in the operator's failure handling, we hand a 
> user-code exception to the checkpoint coordinator via Java serialization but 
> it will then fail during the de-serialization because the class is not 
> available. This will result in the following error shadowing the real one:
> {code}
> java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76)
> at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859)
> at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557)
> at java.lang.Throwable.readObject(Throwable.java:914)
> at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
> at 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222)
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
> at 
> 

[jira] [Updated] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing

2018-09-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-10419:

Description: 
It seems that somewhere in the operator's failure handling, we hand a user-code 
exception to the checkpoint coordinator via Java serialization but it will then 
fail during the de-serialization because the class is not available. This will 
result in the following error shadowing the real one:
{code}
java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.deserializeMethodInvocation(RemoteRpcInvocation.java:118)
at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.getMethodName(RemoteRpcInvocation.java:59)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:214)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor3728anonfun.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 

[jira] [Created] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-09-27 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-10455:
---

 Summary: Potential Kafka producer leak in case of failures
 Key: FLINK-10455
 URL: https://issues.apache.org/jira/browse/FLINK-10455
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.5.2
Reporter: Nico Kruber


If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
may get an {{ProducerFencedException}}. Documentation around 
{{ProducerFencedException}} explicitly states that we should close the producer 
after encountering it.

By looking at the code, it doesn't seem like this is actually done in 
{{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
{{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
exception, we don't clean up (nor try to commit) any other transaction.
-> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
simply iterates over the {{pendingCommitTransactions}} which is not touched 
during {{close()}}

Now if we restart the failing job on the same Flink cluster, any resources from 
the previous attempt will still linger around.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10397) Remove CoreOptions#MODE

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630646#comment-16630646
 ] 

ASF GitHub Bot commented on FLINK-10397:


tillrohrmann commented on a change in pull request #6752: [FLINK-10397] Remove 
CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752#discussion_r220978579
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
 ##
 @@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy 
mode.
 
 Review comment:
   Good catch. Will update it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove CoreOptions#MODE
> ---
>
> Key: FLINK-10397
> URL: https://issues.apache.org/jira/browse/FLINK-10397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Remove the {{CoreOptions#MODE}} since it is no longer needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6752: [FLINK-10397] Remove CoreOptions#MODE

2018-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #6752: [FLINK-10397] Remove 
CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752#discussion_r220978579
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
 ##
 @@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy 
mode.
 
 Review comment:
   Good catch. Will update it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630623#comment-16630623
 ] 

ASF GitHub Bot commented on FLINK-10384:


yanghua edited a comment on issue #6730: [FLINK-10384][table] Add Sinh math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027
 
 
   @pnowojski I have refactored this PR's documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630622#comment-16630622
 ] 

ASF GitHub Bot commented on FLINK-10384:


yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027
 
 
   @pnowojski also refactored this PR's documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL

2018-09-27 Thread GitBox
yanghua edited a comment on issue #6730: [FLINK-10384][table] Add Sinh math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027
 
 
   @pnowojski I have refactored this PR's documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL

2018-09-27 Thread GitBox
yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027
 
 
   @pnowojski also refactored this PR's documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630616#comment-16630616
 ] 

ASF GitHub Bot commented on FLINK-9455:
---

tillrohrmann commented on issue #6734: [FLINK-9455][RM] Add support for multi 
task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#issuecomment-425136715
 
 
   Thanks for the review @azagrebin, @GJL and @shuai-xu. Merging this PR once 
Travis gives green light.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors

2018-09-27 Thread GitBox
tillrohrmann commented on issue #6734: [FLINK-9455][RM] Add support for multi 
task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#issuecomment-425136715
 
 
   Thanks for the review @azagrebin, @GJL and @shuai-xu. Merging this PR once 
Travis gives green light.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630601#comment-16630601
 ] 

ASF GitHub Bot commented on FLINK-10415:


tillrohrmann commented on issue #6763: [FLINK-10415] Fail response future if 
connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#issuecomment-425132571
 
 
   Yes, I will increase the idleness timeout to 5 minutes. Merging this PR then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> 
>
> Key: FLINK-10415
> URL: https://issues.apache.org/jira/browse/FLINK-10415
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1, 1.7.0, 1.5.4
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient

2018-09-27 Thread GitBox
tillrohrmann commented on issue #6763: [FLINK-10415] Fail response future if 
connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#issuecomment-425132571
 
 
   Yes, I will increase the idleness timeout to 5 minutes. Merging this PR then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630600#comment-16630600
 ] 

ASF GitHub Bot commented on FLINK-10402:


StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220965813
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -397,7 +397,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}
 
-   private CompletableFuture shutDownAsync(
+   public CompletableFuture shutDownAsync(
 
 Review comment:
   Hmm, I would leave that call up to you. My feeling is when somebody receives 
a reference of this class that is not allowed to call `shutDownAsync`, I would 
better not make it public at all to prevent that clients call it accidentally. 
If that is not the case, yes why not use the interface that we have for the 
concept.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread GitBox
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220965813
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -397,7 +397,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}
 
-   private CompletableFuture shutDownAsync(
+   public CompletableFuture shutDownAsync(
 
 Review comment:
   Hmm, I would leave that call up to you. My feeling is when somebody receives 
a reference of this class that is not allowed to call `shutDownAsync`, I would 
better not make it public at all to prevent that clients call it accidentally. 
If that is not the case, yes why not use the interface that we have for the 
concept.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630595#comment-16630595
 ] 

ASF GitHub Bot commented on FLINK-10402:


tillrohrmann commented on issue #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#issuecomment-425131771
 
 
   Thanks for the review @StefanRRichter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread GitBox
tillrohrmann commented on issue #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#issuecomment-425131771
 
 
   Thanks for the review @StefanRRichter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630592#comment-16630592
 ] 

ASF GitHub Bot commented on FLINK-10402:


tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220964730
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 ##
 @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws 
Exception {
taskManagerProcess2 = new 
ProcessBuilder(command).start();
new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
 
-   // we wait for the JobManager to have the two 
TaskManagers available
-   // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-   waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
12);
 
 Review comment:
   Yes, we can skip this part, because we support now queued scheduling where 
the TMs don't need to be present before you start the job.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220964730
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 ##
 @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws 
Exception {
taskManagerProcess2 = new 
ProcessBuilder(command).start();
new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
 
-   // we wait for the JobManager to have the two 
TaskManagers available
-   // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-   waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
12);
 
 Review comment:
   Yes, we can skip this part, because we support now queued scheduling where 
the TMs don't need to be present before you start the job.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10410) Search for broken links on travis

2018-09-27 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630591#comment-16630591
 ] 

Piotr Nowojski commented on FLINK-10410:


Ok, thanks [~Zentol] :)

> Search for broken links on travis
> -
>
> Key: FLINK-10410
> URL: https://issues.apache.org/jira/browse/FLINK-10410
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Travis
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.0
>
>
> In the {{/docs/check_links.sh}} directory we have a handy script for 
> searching dead links. We could use this to automatically find dead links on 
> travis.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630590#comment-16630590
 ] 

ASF GitHub Bot commented on FLINK-10402:


tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220964322
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -397,7 +397,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}
 
-   private CompletableFuture shutDownAsync(
+   public CompletableFuture shutDownAsync(
 
 Review comment:
   What about letting `ClusterEntrypoint` implement `AutoCloseableAsync` and 
make this public? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630589#comment-16630589
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on issue #6741: [FLINK-9712][table,docs] Document 
processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#issuecomment-425130543
 
 
   I have almost all of the comments (or wrote reply if not). Can you check the 
revised PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220964322
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -397,7 +397,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}
 
-   private CompletableFuture shutDownAsync(
+   public CompletableFuture shutDownAsync(
 
 Review comment:
   What about letting `ClusterEntrypoint` implement `AutoCloseableAsync` and 
make this public? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on issue #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on issue #6741: [FLINK-9712][table,docs] Document 
processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#issuecomment-425130543
 
 
   I have almost all of the comments (or wrote reply if not). Can you check the 
revised PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630583#comment-16630583
 ] 

ASF GitHub Bot commented on FLINK-10402:


tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220961425
 
 

 ##
 File path: flink-tests/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   Yes, will revert it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220961425
 
 

 ##
 File path: flink-tests/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   Yes, will revert it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630578#comment-16630578
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220960721
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
 
 Review comment:
   Added  more comments, please check them


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For 

[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630573#comment-16630573
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220960460
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tuple2.of("US 

[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220960721
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
 
 Review comment:
   Added  more comments, please check them


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220960460
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
+ratesHistoryData.add(Tuple2.of("Euro", 114L));
+ratesHistoryData.add(Tuple2.of("Yen", 1L));
+ratesHistoryData.add(Tuple2.of("Euro", 116L));
+ratesHistoryData.add(Tuple2.of("Euro", 119L));
+
+DataStreamSource> ordersStream = 

[jira] [Commented] (FLINK-10410) Search for broken links on travis

2018-09-27 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630566#comment-16630566
 ] 

Chesnay Schepler commented on FLINK-10410:
--

You can also fork the branch {{cron-master-docs}} and configure another 
remote/branch in {{.travis.yml}}.

> Search for broken links on travis
> -
>
> Key: FLINK-10410
> URL: https://issues.apache.org/jira/browse/FLINK-10410
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Travis
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.0
>
>
> In the {{/docs/check_links.sh}} directory we have a handy script for 
> searching dead links. We could use this to automatically find dead links on 
> travis.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6873) Limit the number of open writers in file system connector

2018-09-27 Thread Oscar Westra van Holthe - Kind (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630562#comment-16630562
 ] 

Oscar Westra van Holthe - Kind commented on FLINK-6873:
---

What may also be an issue is that the BucketingSink and the newer 
StreamingFileSink seem to ignore event time..

Thus, if your output stream uses a buffer of some sorts and your job catches up 
quickly (processing multiple days worth of events in a few hours), the sink may 
end up having too many open files.

> Limit the number of open writers in file system connector
> -
>
> Key: FLINK-6873
> URL: https://issues.apache.org/jira/browse/FLINK-6873
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Local Runtime, Streaming Connectors
>Reporter: Mu Kong
>Priority: Major
>
> Mail list discuss:
> https://mail.google.com/mail/u/1/#label/MailList%2Fflink-dev/15c869b2a5b20d43
> Following exception will occur when Flink is writing to too many files:
> {code}
> java.lang.OutOfMemoryError: unable to create new native thread
> at java.lang.Thread.start0(Native Method)
> at java.lang.Thread.start(Thread.java:714)
> at org.apache.hadoop.hdfs.DFSOutputStream.start(DFSOutputStream.java:2170)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1685)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:120)
> at 
> org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:62)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:545)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:440)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:230)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:379)
> {code}
> Letting developers decide the max open connections to the open files would be 
> great.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630560#comment-16630560
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220957402
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
 
 Review comment:
   Isn't `define` more widely used in such contexts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: 

[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220957402
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
 
 Review comment:
   Isn't `define` more widely used in such contexts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630554#comment-16630554
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220956370
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 
 Review comment:
   I have added the `TemporalTableFunction` specific import. For the rest of 
the stuff we would need to provide real code example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the

[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220956370
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 
 Review comment:
   I have added the `TemporalTableFunction` specific import. For the rest of 
the stuff we would need to provide real code example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630530#comment-16630530
 ] 

ASF GitHub Bot commented on FLINK-10415:


zentol commented on issue #6763: [FLINK-10415] Fail response future if 
connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#issuecomment-425115999
 
 
   urgh, well that's unfortunate. We may want to increase the idle timeout to a 
larger value though (maybe 5 minutes?), as it effectively is an upper limit for 
the timeout that a user may specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> 
>
> Key: FLINK-10415
> URL: https://issues.apache.org/jira/browse/FLINK-10415
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1, 1.7.0, 1.5.4
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient

2018-09-27 Thread GitBox
zentol commented on issue #6763: [FLINK-10415] Fail response future if 
connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#issuecomment-425115999
 
 
   urgh, well that's unfortunate. We may want to increase the idle timeout to a 
larger value though (maybe 5 minutes?), as it effectively is an upper limit for 
the timeout that a user may specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10401) Port ProcessFailureCancelingITCase to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630510#comment-16630510
 ] 

ASF GitHub Bot commented on FLINK-10401:


tillrohrmann commented on issue #6749: [FLINK-10401] Port 
ProcessFailureCancelingITCase to new code base
URL: https://github.com/apache/flink/pull/6749#issuecomment-425111422
 
 
   Thanks for the review @StefanRRichter. Addressing your comment and then 
merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port ProcessFailureCancelingITCase to new code base
> ---
>
> Key: FLINK-10401
> URL: https://issues.apache.org/jira/browse/FLINK-10401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{ProcessFailureCancelingITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base

2018-09-27 Thread GitBox
tillrohrmann commented on issue #6749: [FLINK-10401] Port 
ProcessFailureCancelingITCase to new code base
URL: https://github.com/apache/flink/pull/6749#issuecomment-425111422
 
 
   Thanks for the review @StefanRRichter. Addressing your comment and then 
merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630506#comment-16630506
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220942767
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
 
 Review comment:
   Changed to:
   > "one must pass a time attribute that determines the version of the table 
that will be returned."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10401) Port ProcessFailureCancelingITCase to new code base

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630505#comment-16630505
 ] 

ASF GitHub Bot commented on FLINK-10401:


tillrohrmann commented on a change in pull request #6749: [FLINK-10401] Port 
ProcessFailureCancelingITCase to new code base
URL: https://github.com/apache/flink/pull/6749#discussion_r220942743
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##
 @@ -130,6 +131,13 @@ public T getDispatcher() {
}
}
 
+   @VisibleForTesting
+   public WebMonitorEndpoint getWebMonitorEndpoint() {
 
 Review comment:
   Yes you're right. I've incorporated your feedback from #6743 and with that 
it should no longer be necessary to have this getter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port ProcessFailureCancelingITCase to new code base
> ---
>
> Key: FLINK-10401
> URL: https://issues.apache.org/jira/browse/FLINK-10401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{ProcessFailureCancelingITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base

2018-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #6749: [FLINK-10401] Port 
ProcessFailureCancelingITCase to new code base
URL: https://github.com/apache/flink/pull/6749#discussion_r220942743
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##
 @@ -130,6 +131,13 @@ public T getDispatcher() {
}
}
 
+   @VisibleForTesting
+   public WebMonitorEndpoint getWebMonitorEndpoint() {
 
 Review comment:
   Yes you're right. I've incorporated your feedback from #6743 and with that 
it should no longer be necessary to have this getter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220942767
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
 
 Review comment:
   Changed to:
   > "one must pass a time attribute that determines the version of the table 
that will be returned."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630502#comment-16630502
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220941093
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
 
 Review comment:
   Good point, it's also about efficiency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-09-27 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220941093
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
 
 Review comment:
   Good point, it's also about efficiency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10396) Remove codebase switch from MiniClusterResource

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630491#comment-16630491
 ] 

ASF GitHub Bot commented on FLINK-10396:


tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove 
CodebaseType
URL: https://github.com/apache/flink/pull/6748#discussion_r220939564
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-val isNew = TestBaseUtils.isNewCodebase()
-if (isNew) {
-  configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-  // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
-  configuration.setInteger(RestOptions.PORT, 8082)
-  val miniConfig = new MiniClusterConfiguration.Builder()
-.setConfiguration(configuration)
-.setNumSlotsPerTaskManager(parallelism)
-.build()
-
-  val miniCluster = new MiniCluster(miniConfig)
-  miniCluster.start()
-  port = miniCluster.getRestAddress.getPort
-  hostname = miniCluster.getRestAddress.getHost
-
-  cluster = Some(Left(miniCluster))
-} else {
-  configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-  configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-  val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-  hostname = standaloneCluster.getHostname
-  port = standaloneCluster.getPort
-
-  cluster = Some(Right(standaloneCluster))
-}
+configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+// set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
+configuration.setInteger(RestOptions.PORT, 8082)
+val miniConfig = new MiniClusterConfiguration.Builder()
+  .setConfiguration(configuration)
+  .setNumSlotsPerTaskManager(parallelism)
+  .build()
+
+val miniCluster = new MiniCluster(miniConfig)
+miniCluster.start()
+port = miniCluster.getRestAddress.getPort
+hostname = miniCluster.getRestAddress.getHost
+
+cluster = Some(Left(miniCluster))
 
 Review comment:
   I don't think so. I will remove it and also remove the 
`StandaloneMiniCluster` at the same moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove codebase switch from MiniClusterResource
> ---
>
> Key: FLINK-10396
> URL: https://issues.apache.org/jira/browse/FLINK-10396
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: 
> 0001-FLINK-10396-Remove-codebase-switch-in-UT-IT-tests.patch
>
>
> Remove the legacy codebase switch from {{MiniClusterResource}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10396) Remove codebase switch from MiniClusterResource

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630494#comment-16630494
 ] 

ASF GitHub Bot commented on FLINK-10396:


tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove 
CodebaseType
URL: https://github.com/apache/flink/pull/6748#discussion_r220939564
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-val isNew = TestBaseUtils.isNewCodebase()
-if (isNew) {
-  configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-  // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
-  configuration.setInteger(RestOptions.PORT, 8082)
-  val miniConfig = new MiniClusterConfiguration.Builder()
-.setConfiguration(configuration)
-.setNumSlotsPerTaskManager(parallelism)
-.build()
-
-  val miniCluster = new MiniCluster(miniConfig)
-  miniCluster.start()
-  port = miniCluster.getRestAddress.getPort
-  hostname = miniCluster.getRestAddress.getHost
-
-  cluster = Some(Left(miniCluster))
-} else {
-  configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-  configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-  val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-  hostname = standaloneCluster.getHostname
-  port = standaloneCluster.getPort
-
-  cluster = Some(Right(standaloneCluster))
-}
+configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+// set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
+configuration.setInteger(RestOptions.PORT, 8082)
+val miniConfig = new MiniClusterConfiguration.Builder()
+  .setConfiguration(configuration)
+  .setNumSlotsPerTaskManager(parallelism)
+  .build()
+
+val miniCluster = new MiniCluster(miniConfig)
+miniCluster.start()
+port = miniCluster.getRestAddress.getPort
+hostname = miniCluster.getRestAddress.getHost
+
+cluster = Some(Left(miniCluster))
 
 Review comment:
   I don't think so. I will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove codebase switch from MiniClusterResource
> ---
>
> Key: FLINK-10396
> URL: https://issues.apache.org/jira/browse/FLINK-10396
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: 
> 0001-FLINK-10396-Remove-codebase-switch-in-UT-IT-tests.patch
>
>
> Remove the legacy codebase switch from {{MiniClusterResource}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType

2018-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove 
CodebaseType
URL: https://github.com/apache/flink/pull/6748#discussion_r220939564
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-val isNew = TestBaseUtils.isNewCodebase()
-if (isNew) {
-  configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-  // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
-  configuration.setInteger(RestOptions.PORT, 8082)
-  val miniConfig = new MiniClusterConfiguration.Builder()
-.setConfiguration(configuration)
-.setNumSlotsPerTaskManager(parallelism)
-.build()
-
-  val miniCluster = new MiniCluster(miniConfig)
-  miniCluster.start()
-  port = miniCluster.getRestAddress.getPort
-  hostname = miniCluster.getRestAddress.getHost
-
-  cluster = Some(Left(miniCluster))
-} else {
-  configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-  configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-  val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-  hostname = standaloneCluster.getHostname
-  port = standaloneCluster.getPort
-
-  cluster = Some(Right(standaloneCluster))
-}
+configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+// set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
+configuration.setInteger(RestOptions.PORT, 8082)
+val miniConfig = new MiniClusterConfiguration.Builder()
+  .setConfiguration(configuration)
+  .setNumSlotsPerTaskManager(parallelism)
+  .build()
+
+val miniCluster = new MiniCluster(miniConfig)
+miniCluster.start()
+port = miniCluster.getRestAddress.getPort
+hostname = miniCluster.getRestAddress.getHost
+
+cluster = Some(Left(miniCluster))
 
 Review comment:
   I don't think so. I will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >