[jira] [Updated] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11087:

Fix Version/s: 1.7.1
   1.6.3

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: Migration, State, broadcast
> Fix For: 1.6.3, 1.7.1
>
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711637#comment-16711637
 ] 

Chesnay Schepler commented on FLINK-11087:
--

[~tzulitai] Can you amend the migration matrix in the 
[documentation|https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table]?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711635#comment-16711635
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-11087 at 12/6/18 4:06 PM:
--

Confirmed, this is indeed a bug, and the cause is what we've discussed above.
This bug prevents Flink 1.6.x, 1.7.0 from restoring broadcast state from a 
1.5.x savepoint.

Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this.
 Thanks for the investigation [~edRojas].


was (Author: tzulitai):
Confirmed, this is indeed a bug, and the cause is what we've discussed above.

Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this.
Thanks for the investigation [~edRojas].

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711635#comment-16711635
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

Confirmed, this is indeed a bug, and the cause is what we've discussed above.

Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this.
Thanks for the investigation [~edRojas].

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711634#comment-16711634
 ] 

ASF GitHub Bot commented on FLINK-11048:


zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239509248
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   This cannot be moved to the ClusterClient since it relies on Streaming API 
classes (StreamExecutionEnvironment, or with your suggestion the StreamGraph) 
that aren't available in `flink-clients`. Hence why there we are only working 
on the `JobGraph`. And we can't just add a dependency since 
`flink-streaming-java` already depends on `flink-clients`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239509248
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   This cannot be moved to the ClusterClient since it relies on Streaming API 
classes (StreamExecutionEnvironment, or with your suggestion the StreamGraph) 
that aren't available in `flink-clients`. Hence why there we are only working 
on the `JobGraph`. And we can't just add a dependency since 
`flink-streaming-java` already depends on `flink-clients`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-11087:
---

Assignee: Tzu-Li (Gordon) Tai

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711618#comment-16711618
 ] 

Timo Walther commented on FLINK-9740:
-

I cannot see it in the JIRA yet but in any case: In general we can do it 
similar to your proposal. Maybe we can also use more efficient Java 8 time 
classes. This is a feature where also the DataStream API could profit from it. 
[~aljoscha] what it your opinion here? I guess monthly windows should be added 
to DataStream API first?

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711607#comment-16711607
 ] 

ASF GitHub Bot commented on FLINK-11048:


mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239501341
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   StreamGraph is exposed anyway because there is 
`StreamExecutionEnvironment.getStreamGraph()` though I see it is marked 
`@Internal`. I'm fine also with passing the environment.
   
   >IMO any use-case where usage of the StreamGraph is appropriate (for which 
tbh there are none as the JobGraph would be a better choice anyway) is already 
a pretty low-level thing in which case you can work directly against a 
ClusterClient.
   
   So you are suggesting to move the code here to `ClusterClient`? I think that 
would make sense.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711599#comment-16711599
 ] 

ASF GitHub Bot commented on FLINK-10945:


zhuzhurk closed pull request #7252: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
/** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;
 
+   /** The input dependency constraint to schedule tasks. */
+   private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
+
// --- User code values 

 
private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}
 
+   /**
+* Sets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @param inputDependencyConstraint The input dependency constraint.
+*/
+   public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
+   this.inputDependencyConstraint = inputDependencyConstraint;
+   }
+
+   /**
+* Gets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @return The input dependency constraint of this job.
+*/
+   public InputDependencyConstraint getInputDependencyConstraint() {
+   return inputDependencyConstraint;
+   }
+
/**
 * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
 * In some cases this might be preferable. For example, when using 
interfaces
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 000..c7a1e6a3519
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its 
inputs status.
+ */
+public enum InputDependencyConstraint {
+
+   /**
+* Schedule the task if any input is consumable.
+*/
+   ANY,
+
+   /**
+* Schedule the task if all the inputs are consumable.
+*/
+   ALL
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String 

[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239501341
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   StreamGraph is exposed anyway because there is 
`StreamExecutionEnvironment.getStreamGraph()` though I see it is marked 
`@Internal`. I'm fine also with passing the environment.
   
   >IMO any use-case where usage of the StreamGraph is appropriate (for which 
tbh there are none as the JobGraph would be a better choice anyway) is already 
a pretty low-level thing in which case you can work directly against a 
ClusterClient.
   
   So you are suggesting to move the code here to `ClusterClient`? I think that 
would make sense.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711598#comment-16711598
 ] 

ASF GitHub Bot commented on FLINK-10945:


zhuzhurk commented on issue #7252: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252#issuecomment-444913214
 
 
   > Please open the PR against the master branch.
   
   Thanks Zentol for pointing out the fault. The new PR is 
https://github.com/apache/flink/pull/7255


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zhuzhurk closed pull request #7252: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
/** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;
 
+   /** The input dependency constraint to schedule tasks. */
+   private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
+
// --- User code values 

 
private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}
 
+   /**
+* Sets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @param inputDependencyConstraint The input dependency constraint.
+*/
+   public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
+   this.inputDependencyConstraint = inputDependencyConstraint;
+   }
+
+   /**
+* Gets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @return The input dependency constraint of this job.
+*/
+   public InputDependencyConstraint getInputDependencyConstraint() {
+   return inputDependencyConstraint;
+   }
+
/**
 * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
 * In some cases this might be preferable. For example, when using 
interfaces
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 000..c7a1e6a3519
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its 
inputs status.
+ */
+public enum InputDependencyConstraint {
+
+   /**
+* Schedule the task if any input is consumable.
+*/
+   ANY,
+
+   /**
+* Schedule the task if all the inputs are consumable.
+*/
+   ALL
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String toString() {
final ResultPartitionLocation partitionLocation;
 
// The producing task needs to be RUNNING or already 
FINISHED
-   if (consumedPartition.isConsumable() && 

[GitHub] zhuzhurk commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zhuzhurk commented on issue #7252: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252#issuecomment-444913214
 
 
   > Please open the PR against the master branch.
   
   Thanks Zentol for pointing out the fault. The new PR is 
https://github.com/apache/flink/pull/7255


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711596#comment-16711596
 ] 

ASF GitHub Bot commented on FLINK-11048:


zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239497531
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Using the environment is the simplest option and hides several internal 
classes (like the StreamGraph) that we shouldn't expose to users. Effectively 
this serves as an extension to the `RemoteStreamEnvironment` without baking it 
into a `RemoteStreamEnvironment` instance itself, which is fair imo.
   
   IMO any use-case where usage of the StreamGraph is appropriate (for which 
tbh there are none as the JobGraph would be a better choice anyway) is already 
a pretty low-level thing in which case you can work directly against a 
ClusterClient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711595#comment-16711595
 ] 

ASF GitHub Bot commented on FLINK-10945:


zhuzhurk opened a new pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255
 
 
   …locks in LAZY_FROM_SOURCES scheduling when resources are limited
   
   ## What is the purpose of the change
   
   This PR add a job config InputDependencyConstraint, which helps to avoid 
resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited, 
as described in 
[FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945).
   
   ## Brief change log
   
- *Add InputDependencyConstraint to ExecutionConfig*
- *Adjust isConsumable interface in IntermediateResultPartition to fit for 
the data actual consumable definition*
- Change current execution lazy scheduling logic(in 
Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the 
InputDependencyConstraint is satisfied(an interface 
ExecutionVertex.checkInputDependencyConstraints is added to serve this 
purpose). 
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added IntermediateResultPartitionTest to validate 
IntermediateResultPartition changes*
 - *Added ExecutionVertexInputConstraintTest to validate the constraint 
check logic in ExecutionVertex*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239497531
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Using the environment is the simplest option and hides several internal 
classes (like the StreamGraph) that we shouldn't expose to users. Effectively 
this serves as an extension to the `RemoteStreamEnvironment` without baking it 
into a `RemoteStreamEnvironment` instance itself, which is fair imo.
   
   IMO any use-case where usage of the StreamGraph is appropriate (for which 
tbh there are none as the JobGraph would be a better choice anyway) is already 
a pretty low-level thing in which case you can work directly against a 
ClusterClient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk opened a new pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zhuzhurk opened a new pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255
 
 
   …locks in LAZY_FROM_SOURCES scheduling when resources are limited
   
   ## What is the purpose of the change
   
   This PR add a job config InputDependencyConstraint, which helps to avoid 
resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited, 
as described in 
[FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945).
   
   ## Brief change log
   
- *Add InputDependencyConstraint to ExecutionConfig*
- *Adjust isConsumable interface in IntermediateResultPartition to fit for 
the data actual consumable definition*
- Change current execution lazy scheduling logic(in 
Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the 
InputDependencyConstraint is satisfied(an interface 
ExecutionVertex.checkInputDependencyConstraints is added to serve this 
purpose). 
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added IntermediateResultPartitionTest to validate 
IntermediateResultPartition changes*
 - *Added ExecutionVertexInputConstraintTest to validate the constraint 
check logic in ExecutionVertex*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711581#comment-16711581
 ] 

ASF GitHub Bot commented on FLINK-10887:


jgrier edited a comment on issue #7099: [FLINK-10887] [jobmaster] Add source 
watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282
 
 
   @aljoscha @tweise Can you guys comment on the above generic aggregator 
proposal?  I'd like to keep this moving forward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jgrier edited a comment on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2018-12-06 Thread GitBox
jgrier edited a comment on issue #7099: [FLINK-10887] [jobmaster] Add source 
watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282
 
 
   @aljoscha @tweise Can you guys comment on the above generic aggregator 
proposal?  I'd like to keep this moving forward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711578#comment-16711578
 ] 

ASF GitHub Bot commented on FLINK-10887:


jgrier commented on issue #7099: [FLINK-10887] [jobmaster] Add source watermark 
tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282
 
 
   @aljoscha @tweise Can you guys comment on the above proposal?  I'd like to 
keep this moving forward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jgrier commented on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2018-12-06 Thread GitBox
jgrier commented on issue #7099: [FLINK-10887] [jobmaster] Add source watermark 
tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282
 
 
   @aljoscha @tweise Can you guys comment on the above proposal?  I'd like to 
keep this moving forward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread xuqianjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuqianjin updated FLINK-9740:
-
Attachment: (was: About [FLINK-9740] Support group windows over 
intervals of months.pdf)

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread xuqianjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuqianjin updated FLINK-9740:
-
Comment: was deleted

(was: hi [~twalthr] The attachment has been submitted)

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711511#comment-16711511
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to 
verify this.
Currently that ITCase had now way to capture this bug, because the key / value 
types of broadcast state tested there are all identical ..

If it turns out to be true, then we'll need to lift this as a blocker bug.

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711511#comment-16711511
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-11087 at 12/6/18 2:35 PM:
--

I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to 
verify this.
 Currently that ITCase had no way to capture this potential bug, because the 
key / value types of broadcast state tested there are all identical ..

If it turns out to be true, then we'll need to lift this as a blocker bug.


was (Author: tzulitai):
I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to 
verify this.
Currently that ITCase had now way to capture this bug, because the key / value 
types of broadcast state tested there are all identical ..

If it turns out to be true, then we'll need to lift this as a blocker bug.

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11049) Unable to execute partial DAG

2018-12-06 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711508#comment-16711508
 ] 

Till Rohrmann commented on FLINK-11049:
---

Hi [~zjffdu], the Flink works is that all branches of the DAG with a sink at 
its end will be executed when one calls {{ExecutionEnvironment#execute}} or 
calls a method which triggers eager execution ({{print}}, {{collect}} or 
{{count}}). In that sense, the behaviour of the code snippet is expected. 

How do you think Flink should behave in this situation.

> Unable to execute partial DAG
> -
>
> Key: FLINK-11049
> URL: https://issues.apache.org/jira/browse/FLINK-11049
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Affects Versions: 1.7.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> {code}
> val benv = ExecutionEnvironment.getExecutionEnvironment
> val btEnv = TableEnvironment.getTableEnvironment(benv)
> val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
> data.writeAsText("/Users/jzhang/a.txt", FileSystem.WriteMode.OVERWRITE);
> val table = data.flatMap(line=>line.split("\\s")).
>   map(w => (w, 1)).
>   toTable(btEnv, 'word, 'number)
> btEnv.registerTable("wc", table)
> btEnv.sqlQuery("select word, count(1) from wc group by word").
>   toDataSet[Row].print()
> {code}
> In the above example, the last statement will trigger 2 job execution 
> (writeAsText and print), but what user expect is the print job. The root 
> cause is that currently, flink unable to submit partial dag. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711495#comment-16711495
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

I checked the key / value serializer association in the 
{{DefaultOperatorStateBackend}}; it seems to be correct.

The problem, as [~edRojas] has indicated, is when restoring from 1.5 
(regardless of whether the restore happens in 1.6 or 1.7), the serializer keys 
( {{KEY_SERIALIZER}} and {{VALUE_SERIALIZER}} ) in the state meta info had 
swapped positions.
Because of this line, 
[https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165],
for broadcast state meta info the first read serializer (which is the key 
serializer) is assigned the {{VALUE_SERIALIZER}} key, while the second read 
serializer (which is the value serializer) is assigned the {{KEY_SERIALIZER}} 
key.

This looks like a bug that affects both Flink 1.6.x and Flink 1.7.0.

[~stefanrichte...@gmail.com] could you confirm this?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9920) BucketingSinkFaultToleranceITCase fails on travis

2018-12-06 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711492#comment-16711492
 ] 

Dawid Wysakowicz commented on FLINK-9920:
-

Another instance: https://api.travis-ci.org/v3/job/464266452/log.txt

> BucketingSinkFaultToleranceITCase fails on travis
> -
>
> Key: FLINK-9920
> URL: https://issues.apache.org/jira/browse/FLINK-9920
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> https://travis-ci.org/zentol/flink/jobs/407021898
> {code}
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.082 sec 
> <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase
> runCheckpointedProgram(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase)
>   Time elapsed: 5.696 sec  <<< FAILURE!
> java.lang.AssertionError: Read line does not match expected pattern.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase.postSubmit(BucketingSinkFaultToleranceITCase.java:182)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11086) flink-hadoop-compatibility tests fail for 3.x hadoop versions

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711431#comment-16711431
 ] 

Chesnay Schepler commented on FLINK-11086:
--

We haven't looked into hadoop3 support yet.

> flink-hadoop-compatibility tests fail for 3.x hadoop versions
> -
>
> Key: FLINK-11086
> URL: https://issues.apache.org/jira/browse/FLINK-11086
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Sebastian Klemke
>Priority: Major
>
> All builds using maven 3.2.5 on commithash 
> ed8ff14ed39d08cd319efe75b40b9742a2ae7558.
> Attempted builds:
>  - mvn clean install -Dhadoop.version=3.0.3
>  - mvn clean install -Dhadoop.version=3.1.1
> Integration tests with Hadoop input format datasource fail. Example stack 
> trace, taken from hadoop.version 3.1.1 build:
> {code:java}
> testJobCollectionExecution(org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase)
>   Time elapsed: 0.275 sec  <<< ERR
> OR!
> java.lang.NoClassDefFoundError: 
> org/apache/flink/hadoop/shaded/com/google/re2j/PatternSyntaxException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at org.apache.hadoop.fs.Globber.doGlob(Globber.java:210)
> at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2085)
> at 
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:269)
> at 
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
> at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
> at 
> org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:225)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
> at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115)
> at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38)
> at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52)
> at 
> org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.internalRun(WordCountMapredITCase.java:121)
> at 
> org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.testProgram(WordCountMapredITCase.java:71)
> {code}
> Maybe hadoop 3.x versions could be added to test matrix as well?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10593) Add support for ALL ROWS PER MATCH

2018-12-06 Thread xueyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xueyu reassigned FLINK-10593:
-

Assignee: xueyu

> Add support for ALL ROWS PER MATCH
> --
>
> Key: FLINK-10593
> URL: https://issues.apache.org/jira/browse/FLINK-10593
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: xueyu
>Priority: Major
>
> We should properly support it in both {{FINAL}} and {{RUNNING}} modes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711471#comment-16711471
 ] 

ASF GitHub Bot commented on FLINK-10945:


zentol commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint 
to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252#issuecomment-444876798
 
 
   Please open the PR against the master branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Edward Rojas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711469#comment-16711469
 ] 

Edward Rojas commented on FLINK-11087:
--

I just tried out, 1.5.3 to 1.6.2.

I get a different error, but with the debugging, I see that the reason is the 
same, the key serializer and the value serializer are swapped in the 
OperatorBackendStateMetaInfoReaderV2V3 class. 

The error is the following:
{noformat}
org.apache.flink.util.StateMigrationException: State migration is currently not 
supported.
at 
org.apache.flink.util.StateMigrationException.notSupported(StateMigrationException.java:42)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:263)
at 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745){noformat}

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zentol commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint 
to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252#issuecomment-444876798
 
 
   Please open the PR against the master branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711468#comment-16711468
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-444876607
 
 
   Will merge this today or tomorrow depending on how much time I need for 
cleaning up various smaller things.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges

2018-12-06 Thread GitBox
zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-444876607
 
 
   Will merge this today or tomorrow depending on how much time I need for 
cleaning up various smaller things.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711450#comment-16711450
 ] 

ASF GitHub Bot commented on FLINK-11048:


mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239451409
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   If we make this `static` it will also break the `@Public` contract. We can 
introduce a new static method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang opened a new pull request #7254: [hotfix][tableApi] fix tEnv in tableApi docs

2018-12-06 Thread GitBox
maqingxiang opened a new pull request #7254: [hotfix][tableApi] fix tEnv in 
tableApi docs
URL: https://github.com/apache/flink/pull/7254
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711449#comment-16711449
 ] 

ASF GitHub Bot commented on FLINK-11048:


mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239452431
 
 

 ##
 File path: 
flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
 ##
 @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment(
this.flinkILoop = flinkILoop;
}
 
-   /**
-* Executes the remote job.
-*
-* @param streamGraph
-*Stream Graph to execute
-* @param jarFiles
-*List of jar file URLs to ship to the cluster
-* @return The result of the job execution, containing elapsed time and 
accumulators.
-*/
-   @Override
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   protected List getJarFiles() throws ProgramInvocationException {
URL jarUrl;
try {
jarUrl = 
flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e) {
+   // TODO: we don't have the actual jobID at this point
throw new ProgramInvocationException("Could not write 
the user code classes to disk.",
-   streamGraph.getJobGraph().getJobID(), e);
+   new JobID(), e);
 
 Review comment:
   That will generate a random `JobID`, not sure about this. Let's use the 
constructor without the JobID.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711451#comment-16711451
 ] 

ASF GitHub Bot commented on FLINK-11048:


mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239451677
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+   
List jarFiles,
 
 Review comment:
   The "most readable option" depends a lot on preference :) But let's stick 
with what is commonly used nowadays.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239451409
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   If we make this `static` it will also break the `@Public` contract. We can 
introduce a new static method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711448#comment-16711448
 ] 

ASF GitHub Bot commented on FLINK-11048:


mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239453780
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Question: Why don't we pass the required dependencies directly here? i.e. 
`StreamGraph` and `ExecutionConfig`
   
   IMHO there is no need to pass the entire environment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711447#comment-16711447
 ] 

ASF GitHub Bot commented on FLINK-11048:


mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239451179
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   I agree that we can't do this. The method is `protected` but it would still 
break subclasses relying on the `@Public` contract.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239452431
 
 

 ##
 File path: 
flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
 ##
 @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment(
this.flinkILoop = flinkILoop;
}
 
-   /**
-* Executes the remote job.
-*
-* @param streamGraph
-*Stream Graph to execute
-* @param jarFiles
-*List of jar file URLs to ship to the cluster
-* @return The result of the job execution, containing elapsed time and 
accumulators.
-*/
-   @Override
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   protected List getJarFiles() throws ProgramInvocationException {
URL jarUrl;
try {
jarUrl = 
flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e) {
+   // TODO: we don't have the actual jobID at this point
throw new ProgramInvocationException("Could not write 
the user code classes to disk.",
-   streamGraph.getJobGraph().getJobID(), e);
+   new JobID(), e);
 
 Review comment:
   That will generate a random `JobID`, not sure about this. Let's use the 
constructor without the JobID.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239451179
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   I agree that we can't do this. The method is `protected` but it would still 
break subclasses relying on the `@Public` contract.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239453780
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Question: Why don't we pass the required dependencies directly here? i.e. 
`StreamGraph` and `ExecutionConfig`
   
   IMHO there is no need to pass the entire environment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239451677
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+   
List jarFiles,
 
 Review comment:
   The "most readable option" depends a lot on preference :) But let's stick 
with what is commonly used nowadays.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711442#comment-16711442
 ] 

xuqianjin commented on FLINK-9740:
--

hi [~twalthr] The attachment has been submitted

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Attachments: About [FLINK-9740] Support group windows over intervals 
> of months.pdf
>
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711440#comment-16711440
 ] 

xuqianjin commented on FLINK-9740:
--

@

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Attachments: About [FLINK-9740] Support group windows over intervals 
> of months.pdf
>
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread xuqianjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuqianjin updated FLINK-9740:
-
Comment: was deleted

(was: @)

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Attachments: About [FLINK-9740] Support group windows over intervals 
> of months.pdf
>
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9740) Support group windows over intervals of months

2018-12-06 Thread xuqianjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuqianjin updated FLINK-9740:
-
Attachment: About [FLINK-9740] Support group windows over intervals of 
months.pdf

> Support group windows over intervals of months 
> ---
>
> Key: FLINK-9740
> URL: https://issues.apache.org/jira/browse/FLINK-9740
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Attachments: About [FLINK-9740] Support group windows over intervals 
> of months.pdf
>
>
> Currently, time-based group windows can be defined using intervals of 
> milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. 
> For some use cases it might useful to define windows of months (esp. in 
> event-time) that work even with leap years and other special time cases.
> The following should be supported in Table API & SQL:
> {{.window(Tumble over 1.month on 'rowtime as 'w)}}
> {{.window(Tumble over 1.quarter on 'rowtime as 'w)}}
> {{.window(Tumble over 1.year on 'rowtime as 'w)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711437#comment-16711437
 ] 

Chesnay Schepler commented on FLINK-11087:
--

Have you tried migration from 1.5 to 1.6 to 1.7?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11085) NoClassDefFoundError in presto-s3 filesystem

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711433#comment-16711433
 ] 

Chesnay Schepler commented on FLINK-11085:
--

[~StephanEwen] What's your take on this?

> NoClassDefFoundError in presto-s3 filesystem
> 
>
> Key: FLINK-11085
> URL: https://issues.apache.org/jira/browse/FLINK-11085
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> A user has reporter an issue on the ML where using the presto-s3 filesystem 
> fails with an exception due to a missing class. The missing class is indeed 
> filtered out in the shade-plugin configuration.
> {code:java}
> java.lang.NoClassDefFoundError: 
> org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus
>   at 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446)
>   at 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423)
>   at 
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80)
>   at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250)
>   at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
>   at 
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Edward Rojas (JIRA)
Edward Rojas created FLINK-11087:


 Summary: Broadcast state migration Incompatibility from 1.5.3 to 
1.7.0
 Key: FLINK-11087
 URL: https://issues.apache.org/jira/browse/FLINK-11087
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.7.0
 Environment: Migration from Flink 1.5.3 to Flink 1.7.0
Reporter: Edward Rojas


When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
state throws the following error:
{noformat}
org.apache.flink.util.StateMigrationException: The new key serializer for 
broadcast state must not be incompatible.
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
at 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745){noformat}
The broadcast is using a MapState with StringSerializer as key serializer and a 
custom JsonSerializer as value serializer. 

There was no changes in the TypeSerializers used, only upgrade of version. 

 

With some debugging I see that at the moment of the validation of the 
compatibility of states in the DefaultOperatorStateBackend class, the 
"*registeredBroadcastStates*" containing the data about the 'old' state, 
contains wrong association of the key and value serializer. This is, 
JsonSerializer appears as key serializer and StringSerializer appears as value 
serializer. (when it should be the contrary)

 

After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
class is the responsible of this swap here:
https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11086) flink-hadoop-compatibility tests fail for 3.x hadoop versions

2018-12-06 Thread Sebastian Klemke (JIRA)
Sebastian Klemke created FLINK-11086:


 Summary: flink-hadoop-compatibility tests fail for 3.x hadoop 
versions
 Key: FLINK-11086
 URL: https://issues.apache.org/jira/browse/FLINK-11086
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: Sebastian Klemke


All builds using maven 3.2.5 on commithash 
ed8ff14ed39d08cd319efe75b40b9742a2ae7558.

Attempted builds:
 - mvn clean install -Dhadoop.version=3.0.3
 - mvn clean install -Dhadoop.version=3.1.1

Integration tests with Hadoop input format datasource fail. Example stack 
trace, taken from hadoop.version 3.1.1 build:
{code:java}
testJobCollectionExecution(org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase)
  Time elapsed: 0.275 sec  <<< ERR
OR!
java.lang.NoClassDefFoundError: 
org/apache/flink/hadoop/shaded/com/google/re2j/PatternSyntaxException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:210)
at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2085)
at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:269)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
at 
org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:225)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115)
at 
org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38)
at 
org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52)
at 
org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.internalRun(WordCountMapredITCase.java:121)
at 
org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.testProgram(WordCountMapredITCase.java:71)
{code}
Maybe hadoop 3.x versions could be added to test matrix as well?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11085) NoClassDefFoundError in presto-s3 filesystem

2018-12-06 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11085:
-
Summary: NoClassDefFoundError in presto-s3 filesystem  (was: 
flink-s3-fs-presto)

> NoClassDefFoundError in presto-s3 filesystem
> 
>
> Key: FLINK-11085
> URL: https://issues.apache.org/jira/browse/FLINK-11085
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> A user has reporter an issue on the ML where using the presto-s3 filesystem 
> fails with an exception due to a missing class. The missing class is indeed 
> filtered out in the shade-plugin configuration.
> {code:java}
> java.lang.NoClassDefFoundError: 
> org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus
>   at 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446)
>   at 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423)
>   at 
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80)
>   at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250)
>   at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
>   at 
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11085) flink-s3-fs-presto

2018-12-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11085:


 Summary: flink-s3-fs-presto
 Key: FLINK-11085
 URL: https://issues.apache.org/jira/browse/FLINK-11085
 Project: Flink
  Issue Type: Bug
  Components: FileSystem
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


A user has reporter an issue on the ML where using the presto-s3 filesystem 
fails with an exception due to a missing class. The missing class is indeed 
filtered out in the shade-plugin configuration.
{code:java}
java.lang.NoClassDefFoundError: 
org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423)
at 
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80)
at 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250)
at 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
at 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711387#comment-16711387
 ] 

Chesnay Schepler commented on FLINK-10832:
--

The project you attached works for me locally in the IDE, and also when 
submitting the built project to a 1.6.2, 1.7.0 (taken from 
[https://dist.apache.org/repos/dist/release/flink)] and 1.8-SNAPSHOT (locally 
built)  cluster.

I did not make _any_ modifications to the project.

What we can see in the logs you provided is that the JobMaster is not shutting 
down.

The following line is present in my logs, but missing in the attached logs:
{code:java}
13:25:56.709 [flink-akka.actor.default-dispatcher-3] INFO  
o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Simple 
Test(e8a889dc0394b0c664dcce46f36ed0cc).{code}
Beyond that the main difference between the logs is this:
{code:java}
DEBUG o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection 
6ac59614b23655af1e13f08e16b96ea4.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new 
address null{code}
{code:java}
DEBUG o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection 
966422750006990ed1f304766e210161.
org.apache.flink.util.FlinkException: JobManager is shutting down.{code}
[~till.rohrmann] Do you have any idea?

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> 

[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711394#comment-16711394
 ] 

ASF GitHub Bot commented on FLINK-7208:
---

dianfu commented on a change in pull request #7201: [FLINK-7208] [table] 
Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239438568
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase 
{
   .groupBy('b)
   .select('a.count as 'cnt, 'b)
   .groupBy('cnt)
-  .select('cnt, 'b.count as 'freq)
+  .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max)
 
 val results = t.toRetractStream[Row](queryConfig)
 
 results.addSink(new RetractingSink)
 env.execute()
-val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", 
"6,1,6,6")
 
 Review comment:
   @walterddr @twalthr Thanks for the suggestion and I have created a PR for 
FLINK-11074: https://github.com/apache/flink/pull/7253.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView
> -
>
> Key: FLINK-7208
> URL: https://issues.apache.org/jira/browse/FLINK-7208
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: kaibo.zhou
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView

2018-12-06 Thread GitBox
dianfu commented on a change in pull request #7201: [FLINK-7208] [table] 
Optimize Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#discussion_r239438568
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase 
{
   .groupBy('b)
   .select('a.count as 'cnt, 'b)
   .groupBy('cnt)
-  .select('cnt, 'b.count as 'freq)
+  .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max)
 
 val results = t.toRetractStream[Row](queryConfig)
 
 results.addSink(new RetractingSink)
 env.execute()
-val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", 
"6,1,6,6")
 
 Review comment:
   @walterddr @twalthr Thanks for the suggestion and I have created a PR for 
FLINK-11074: https://github.com/apache/flink/pull/7253.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711388#comment-16711388
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu opened a new pull request #7253: [FLINK-11074] [table][tests] Enable 
harness tests with RocksdbStateBackend and add harness tests for 
CollectAggFunction
URL: https://github.com/apache/flink/pull/7253
 
 
   ## What is the purpose of the change
   
   *This pull request enables the harness tests to test with 
RocksdbStateBackend and adds harness test for CollectAggFunction*
   
   ## Brief change log
   
 - *Adds AggFunctionHarnessTest*
 - *Fix an issue in CollectAggFunction*
   
   ## Verifying this change
   
 - *Added tests in AggFunctionHarnessTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11074:
---
Labels: pull-request-available  (was: )

> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu opened a new pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-06 Thread GitBox
dianfu opened a new pull request #7253: [FLINK-11074] [table][tests] Enable 
harness tests with RocksdbStateBackend and add harness tests for 
CollectAggFunction
URL: https://github.com/apache/flink/pull/7253
 
 
   ## What is the purpose of the change
   
   *This pull request enables the harness tests to test with 
RocksdbStateBackend and adds harness test for CollectAggFunction*
   
   ## Brief change log
   
 - *Adds AggFunctionHarnessTest*
 - *Fix an issue in CollectAggFunction*
   
   ## Verifying this change
   
 - *Added tests in AggFunctionHarnessTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711363#comment-16711363
 ] 

ASF GitHub Bot commented on FLINK-10945:


zhuzhurk opened a new pull request #7252: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252
 
 
   …locks for finite stream jobs when resources are limited
   
   ## What is the purpose of the change
   
   This PR add a job config InputDependencyConstraint, which helps to avoid 
resource deadlocks in scheduling when resources are limited, as described in 
[FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945).
   
   ## Brief change log
   
- *Add InputDependencyConstraint to ExecutionConfig*
- *Adjust isConsumable interface in IntermediateResultPartition to fit for 
the data actual consumable definition*
- Change current execution lazy scheduling logic(in 
Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the 
InputDependencyConstraint is satisfied(an interface 
ExecutionVertex.checkInputDependencyConstraints is added to serve this 
purpose). 
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added IntermediateResultPartitionTest to validate 
IntermediateResultPartition changes*
 - *Added ExecutionVertexInputConstraintTest to validate the constraint 
check logic in ExecutionVertex*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11026) Rework creation of sql-client connector/format jars

2018-12-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11026:
---
Labels: pull-request-available  (was: )

> Rework creation of sql-client connector/format jars
> ---
>
> Key: FLINK-11026
> URL: https://issues.apache.org/jira/browse/FLINK-11026
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, SQL Client
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> For the SQL client we currently have a separate {{sql-jars}} profile in 
> various connectors/formats that create an additional fat jar with a separate 
> classifier.
> One of the core maven mantras is "One artifact per module.", and we see the 
> importance of this mantra as our current packaging strategy makes it 
> impossible to provide different NOTICE files for the created jars (regular 
> and sql-jar).
> Currently we would be forced to provide the same file for both jars, which 
> will cause problems for any downstream users that wants to handle NOTICE 
> files properly. We would cause the same issue we had with netty, which 
> categorically claims to be bundling dependencies although it doesn't, forcing 
> us to manually cut out the valid parts.
> My proposal is to move custom packaging strategies into their own module that 
> depend on the original module.
>  I will use {{flink-connector-elasticsearch6}} as an example, which packages 
> both a regular jar without any included dependencies, and a sql jar bundling 
> everything.
>  * create a separate 
> {{flink-sql-connector-elasticsearch6/}}{{flink-connector-elasticsearch6-uber}}{{}}
>  module
>  * this module depends on {{flink-connector-elasticsearch6}}, and bundles all 
> dependencies
>  * move the current shading logic for the sql jar out of the {{sql-jars}} 
> profile{{}}
>  * add a {{sql-jars}} profile to {{flink-connectors}} for skipping the 
> creation of these jars



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhuzhurk opened a new pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zhuzhurk opened a new pull request #7252: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252
 
 
   …locks for finite stream jobs when resources are limited
   
   ## What is the purpose of the change
   
   This PR add a job config InputDependencyConstraint, which helps to avoid 
resource deadlocks in scheduling when resources are limited, as described in 
[FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945).
   
   ## Brief change log
   
- *Add InputDependencyConstraint to ExecutionConfig*
- *Adjust isConsumable interface in IntermediateResultPartition to fit for 
the data actual consumable definition*
- Change current execution lazy scheduling logic(in 
Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the 
InputDependencyConstraint is satisfied(an interface 
ExecutionVertex.checkInputDependencyConstraints is added to serve this 
purpose). 
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added IntermediateResultPartitionTest to validate 
IntermediateResultPartition changes*
 - *Added ExecutionVertexInputConstraintTest to validate the constraint 
check logic in ExecutionVertex*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11026) Rework creation of sql-client connector/format jars

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711361#comment-16711361
 ] 

ASF GitHub Bot commented on FLINK-11026:


zentol opened a new pull request #7251: [FLINK-11026][ES6] Rework creation of 
fat sql-client jars 
URL: https://github.com/apache/flink/pull/7251
 
 
   Based on #7247.
   
   ## What is the purpose of the change
   
   This PR is a PoC for reworking the packaging of jars specific to the 
sql-client (which basically are just fat-jars). Only the 
`flink-connector-elasticsearch6` module is covered here; if accepted the same 
principle should be applied to the kafka connectors (0.10, 0.11, 2) and all 
formats.
   
   Instead of defining separate shade-plugin execution with a custom 
artifactSuffix this PR adds a dedicated `flink-sql-connector-elasticsearch6` 
module which only contains the packaging logic. This is a similar approach that 
we've already been using for `flink-shaded-hadoop2-uber`.
   
   The main motivation for this is licensing; for accurate notice files it is 
necessary to be able to supply each artifact with distinct NOTICE files.
   
   This cannot be done within a single module in a reasonable way. We would 
have to un-package each created jar, add the appropriate license files, and 
re-pack them again. We'd end up with tightly-coupled plugin definitions (since 
the names have to match!) and an overall more complicated (and slower!) build.
   
   ## Brief change log
   
   * add new `flink-sql-connector-elasticsearch6` module containing the 
sql-client-specific shade-plugin configuration and apply the following 
modifications
 *  set `executionId` to `shade-flink`
 * disable `shadedArtifactAttached` so only a single jar is deployed
 * remove sql-jar suffix as it is no longer necessary
   * remove sqlJars profile from `flink-connector-elasticsearch6`
   * add `sqlJars` profile to `flink-connectors` to support skipping the 
creation of sql jars
   
   ## Verifying this change
   
   Covered by sql-client E2E test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
   
   ## Documentation
   
   I have not checked the documentation yet for references that would have to 
be changed.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Rework creation of sql-client connector/format jars
> ---
>
> Key: FLINK-11026
> URL: https://issues.apache.org/jira/browse/FLINK-11026
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, SQL Client
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> For the SQL client we currently have a separate {{sql-jars}} profile in 
> various connectors/formats that create an additional fat jar with a separate 
> classifier.
> One of the core maven mantras is "One artifact per module.", and we see the 
> importance of this mantra as our current packaging strategy makes it 
> impossible to provide different NOTICE files for the created jars (regular 
> and sql-jar).
> Currently we would be forced to provide the same file for both jars, which 
> will cause problems for any downstream users that wants to handle NOTICE 
> files properly. We would cause the same issue we had with netty, which 
> categorically claims to be bundling dependencies although it doesn't, forcing 
> us to manually cut out the valid parts.
> My proposal is to move custom packaging strategies into their own module that 
> depend on the original module.
>  I will use {{flink-connector-elasticsearch6}} as an example, which packages 
> both a regular jar without any included dependencies, and a sql jar bundling 
> everything.
>  * create a separate 
> {{flink-sql-connector-elasticsearch6/}}{{flink-connector-elasticsearch6-uber}}{{}}
>  module
>  * this module depends on {{flink-connector-elasticsearch6}}, and bundles all 
> dependencies
>  * move the current shading logic for the sql jar out of the {{sql-jars}} 
> profile{{}}
>  * add a {{sql-jars}} profile to {{flink-connectors}} for skipping the 
> creation of these jars



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol opened a new pull request #7251: [FLINK-11026][ES6] Rework creation of fat sql-client jars

2018-12-06 Thread GitBox
zentol opened a new pull request #7251: [FLINK-11026][ES6] Rework creation of 
fat sql-client jars 
URL: https://github.com/apache/flink/pull/7251
 
 
   Based on #7247.
   
   ## What is the purpose of the change
   
   This PR is a PoC for reworking the packaging of jars specific to the 
sql-client (which basically are just fat-jars). Only the 
`flink-connector-elasticsearch6` module is covered here; if accepted the same 
principle should be applied to the kafka connectors (0.10, 0.11, 2) and all 
formats.
   
   Instead of defining separate shade-plugin execution with a custom 
artifactSuffix this PR adds a dedicated `flink-sql-connector-elasticsearch6` 
module which only contains the packaging logic. This is a similar approach that 
we've already been using for `flink-shaded-hadoop2-uber`.
   
   The main motivation for this is licensing; for accurate notice files it is 
necessary to be able to supply each artifact with distinct NOTICE files.
   
   This cannot be done within a single module in a reasonable way. We would 
have to un-package each created jar, add the appropriate license files, and 
re-pack them again. We'd end up with tightly-coupled plugin definitions (since 
the names have to match!) and an overall more complicated (and slower!) build.
   
   ## Brief change log
   
   * add new `flink-sql-connector-elasticsearch6` module containing the 
sql-client-specific shade-plugin configuration and apply the following 
modifications
 *  set `executionId` to `shade-flink`
 * disable `shadedArtifactAttached` so only a single jar is deployed
 * remove sql-jar suffix as it is no longer necessary
   * remove sqlJars profile from `flink-connector-elasticsearch6`
   * add `sqlJars` profile to `flink-connectors` to support skipping the 
creation of sql jars
   
   ## Verifying this change
   
   Covered by sql-client E2E test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
   
   ## Documentation
   
   I have not checked the documentation yet for references that would have to 
be changed.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-12-06 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710963#comment-16710963
 ] 

TisonKun edited comment on FLINK-10333 at 12/6/18 11:49 AM:


Hi [~StephanEwen] and [~till.rohrmann]

With an offline discuss with [~xiaogang.shi] we see ZK has a transactional 
mechanism so that we can ensure only the leader writes ZK.

Given this knowledge and the inconsistencies [~till.rohrmann] noticed, before 
go into reimplementation, I did a survey of the usage of ZK based stores in 
flink. Ideally there is exact one role who writes a specific znode.

There are four types of znodes that flink writes. Besides 
{{SubmittedJobGraphStore}} written by {{Dispatcher}}, 
{{CompletedCheckpointStore}} written by {{JobMaster}} and {{MesosWorkerStore}} 
written by {{MesosResourceManager}}, there is the {{RunningJobsRegistry}} that 
also has a ZK based implementation.

All of the first three write ZK with a heavy “store lock”, but as 
[~xiaogang.shi]  pointing out, it still lacks of atomicity. And with the 
solution based on ZK transaction — for example, a current {{Dispatcher}} leader 
{{setData}} with {{check}} for {{election-node-path}} — we can ensure the 
atomicity while getting rid of the lock.

For the last one, {{RunningJobsRegistry}}, situation becomes a bit more 
complex. {{JobManagerRunner}} is responsible for {{setJobRunning}} and 
{{setJobFinished}} and {{Dispatcher}} is responsible for {{clearJob}}. This is 
against the ideal that one role for one znode. Also I notice that the gap 
between the semantic of {{DONE}} and that of clear is ambiguous. 

We might try to prevent the same job be processed by an approach other than 
check an ephemeral {{DONE}} status. What if we replace {{setJobFinished}} with 
clearing {{RunningJobsRegistry}}?


was (Author: tison):
Hi [~StephanEwen] and [~till.rohrmann]

With an offline discuss with [~xiaogang.shi] we see ZK has a transactional 
mechanism so that we can ensure only the leader writes ZK.

Given this knowledge and the inconsistencies [~till.rohrmann] noticed, before 
go into reimplementation, I did a survey of the usage of ZK based stores in 
flink. Ideally there is exact one role who writes a specific znode.

There are four types of znodes that flink writes. Besides 
{{SubmittedJobGraphStore}} written by {{Dispatcher}}, 
{{CompletedCheckpointStore}} written by {{JobMaster}} and {{MesosWorkerStore}} 
written by {{MesosResourceManager}}, there is the {{RunningJobsRegistry}} that 
also has a ZK based implementation.

All of the first three write ZK with a heavy “store lock”, but as 
[~xiaogang.shi]  pointing out, it still lacks of atomicity. And with the 
solution based on ZK transaction — for example, a current {{Dispatcher}} leader 
{{setData}} with {{check}} for {{election-node-path}} — we can ensure the 
atomicity while getting rid of the lock.

For the last one, {{RunningJobsRegistry}}, situation becomes a bit more 
complex. {{JobManagerRunner}} is responsible for {{setJobRunning}} and 
{{setJobFinished}} and {{Dispatcher}} is responsible for {{clearJob}}. This is 
against the ideal that one role for one znode. Also I notice that the gap 
between the semantic of {{DONE}} and that of clear is ambiguous. 

{{JobSchedulingStatus}} becomes {{DONE}} only if an {{ArchivedExecutionGraph}} 
generated so that we can prevent the same job be processed by an approach other 
than check an ephemeral {{DONE}} status. What if we replace {{setJobFinished}} 
with clearing {{RunningJobsRegistry}}?

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation 

[jira] [Updated] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10945:
---
Labels: pull-request-available  (was: )

> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711338#comment-16711338
 ] 

ASF GitHub Bot commented on FLINK-10945:


zhuzhurk opened a new pull request #7250: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7250
 
 
   …locks for finite stream jobs when resources are limited
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> 

[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711340#comment-16711340
 ] 

ASF GitHub Bot commented on FLINK-10945:


zhuzhurk closed pull request #7250: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7250
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
/** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;
 
+   /** The input dependency constraint to schedule tasks. */
+   private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
+
// --- User code values 

 
private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}
 
+   /**
+* Sets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @param inputDependencyConstraint The input dependency constraint.
+*/
+   public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
+   this.inputDependencyConstraint = inputDependencyConstraint;
+   }
+
+   /**
+* Gets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @return The input dependency constraint of this job.
+*/
+   public InputDependencyConstraint getInputDependencyConstraint() {
+   return inputDependencyConstraint;
+   }
+
/**
 * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
 * In some cases this might be preferable. For example, when using 
interfaces
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 000..c7a1e6a3519
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its 
inputs status.
+ */
+public enum InputDependencyConstraint {
+
+   /**
+* Schedule the task if any input is consumable.
+*/
+   ANY,
+
+   /**
+* Schedule the task if all the inputs are consumable.
+*/
+   ALL
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String 

[GitHub] zhuzhurk closed pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zhuzhurk closed pull request #7250: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7250
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
/** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;
 
+   /** The input dependency constraint to schedule tasks. */
+   private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
+
// --- User code values 

 
private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}
 
+   /**
+* Sets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @param inputDependencyConstraint The input dependency constraint.
+*/
+   public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
+   this.inputDependencyConstraint = inputDependencyConstraint;
+   }
+
+   /**
+* Gets the input dependency constraint for vertex scheduling. It 
indicates when a task
+* should be scheduled considering its inputs status.
+*
+* The default constraint is {@link InputDependencyConstraint#ANY}.
+*
+* @return The input dependency constraint of this job.
+*/
+   public InputDependencyConstraint getInputDependencyConstraint() {
+   return inputDependencyConstraint;
+   }
+
/**
 * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
 * In some cases this might be preferable. For example, when using 
interfaces
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 000..c7a1e6a3519
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its 
inputs status.
+ */
+public enum InputDependencyConstraint {
+
+   /**
+* Schedule the task if any input is consumable.
+*/
+   ANY,
+
+   /**
+* Schedule the task if all the inputs are consumable.
+*/
+   ALL
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String toString() {
final ResultPartitionLocation partitionLocation;
 
// The producing task needs to be RUNNING or already 
FINISHED
-   if (consumedPartition.isConsumable() && 

[GitHub] zhuzhurk opened a new pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

2018-12-06 Thread GitBox
zhuzhurk opened a new pull request #7250: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7250
 
 
   …locks for finite stream jobs when resources are limited
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query

2018-12-06 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711334#comment-16711334
 ] 

vinoyang commented on FLINK-10964:
--

[~twalthr]  Thank you for providing more information. I personally do not tend 
to introduce the timer, there are already a lot of configuration items, we can 
delete the result of the job directly when calling cacelQuery.

> sql-client throws exception when paging through finished batch query 
> -
>
> Key: FLINK-10964
> URL: https://issues.apache.org/jira/browse/FLINK-10964
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: Seth Wiesman
>Assignee: vinoyang
>Priority: Major
>
> When paging through a batch query in state 'Finished' the sql client throws 
> the following exception: 
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a 
> result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711289#comment-16711289
 ] 

ASF GitHub Bot commented on FLINK-11011:


tillrohrmann closed pull request #7216: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7216
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 40a675aca31..0b245e74ff4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -683,8 +683,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Elasticsearch 6 sink end-to-end test unstable
> -
>
> Key: FLINK-11011
> URL: https://issues.apache.org/jira/browse/FLINK-11011
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.8.0, 1.7.1
>Reporter: Timo Walther
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The log contains errors:
> {code}
> 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - 
> Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb 
> with no CheckpointCoordinator
> {code}
>  
> See also: https://api.travis-ci.org/v3/job/459693461/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints

2018-12-06 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-10482:
-

Assignee: Andrey Zagrebin

> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
> --
>
> Key: FLINK-10482
> URL: https://issues.apache.org/jira/browse/FLINK-10482
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.1
>Reporter: Julio Biason
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> Recently I found the following log on my JobManager log:
> {noformat}
> 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  - Implementation 
> error: Unhandled exception.
>  java.lang.IllegalArgumentException: Negative number of in progress 
> checkpoints
>      at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>      at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>      at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>      at 
> org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>      at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source) 
>   
>      at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>      at java.lang.reflect.Method.invoke(Method.java:498)  
>    
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>   
>     
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>      at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   
>   
>      at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   
>    
>      at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   
>    
>      at akka.actor.Actor$class.aroundReceive(Actor.scala:502) 
>   
>    
>      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)  
>   
>    
>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)  
>      
>      at akka.actor.ActorCell.invoke(ActorCell.scala:495)     
>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)   
>   
>    
>      at akka.dispatch.Mailbox.run(Mailbox.scala:224)    
>      at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
>   
>    
>      at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
>   
>    
>      at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   
>      
>      at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)  
>  
>      at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> Related: The job details don't appear, 

[jira] [Updated] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints

2018-12-06 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10482:
--
Fix Version/s: 1.7.1
   1.6.3

> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
> --
>
> Key: FLINK-10482
> URL: https://issues.apache.org/jira/browse/FLINK-10482
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.1
>Reporter: Julio Biason
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> Recently I found the following log on my JobManager log:
> {noformat}
> 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  - Implementation 
> error: Unhandled exception.
>  java.lang.IllegalArgumentException: Negative number of in progress 
> checkpoints
>      at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>      at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>      at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>      at 
> org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>      at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source) 
>   
>      at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>      at java.lang.reflect.Method.invoke(Method.java:498)  
>    
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>   
>     
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>      at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   
>   
>      at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   
>    
>      at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   
>    
>      at akka.actor.Actor$class.aroundReceive(Actor.scala:502) 
>   
>    
>      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)  
>   
>    
>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)  
>      
>      at akka.actor.ActorCell.invoke(ActorCell.scala:495)     
>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)   
>   
>    
>      at akka.dispatch.Mailbox.run(Mailbox.scala:224)    
>      at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
>   
>    
>      at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
>   
>    
>      at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   
>      
>      at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)  
>  
>      at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> Related: The job details don't appear, the screen shows only the 

[jira] [Resolved] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable

2018-12-06 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11011.
---
Resolution: Fixed

Fixed via
1.8.0: 
https://github.com/apache/flink/commit/64f6d0b06f974649c21f10cd4bf97a6a9d742aa6
1.7.1: 
https://github.com/apache/flink/commit/1c1de7415002fc889510e8c84f807efbce149e5a
1.6.3: 
https://github.com/apache/flink/commit/d9c1bf7a2cda85a0540f6f6c69607d0fd8c2598d
1.5.6: 
https://github.com/apache/flink/commit/47b2351159b09396d8bf288f25bc3ebfab99f50f

> Elasticsearch 6 sink end-to-end test unstable
> -
>
> Key: FLINK-11011
> URL: https://issues.apache.org/jira/browse/FLINK-11011
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.8.0, 1.7.1
>Reporter: Timo Walther
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The log contains errors:
> {code}
> 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - 
> Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb 
> with no CheckpointCoordinator
> {code}
>  
> See also: https://api.travis-ci.org/v3/job/459693461/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711287#comment-16711287
 ] 

ASF GitHub Bot commented on FLINK-11011:


tillrohrmann closed pull request #7223: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7223
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c8a8e882913..a89079dea25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -681,8 +681,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -700,8 +704,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Elasticsearch 6 sink end-to-end test unstable
> -
>
> Key: FLINK-11011
> URL: https://issues.apache.org/jira/browse/FLINK-11011
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.8.0, 1.7.1
>Reporter: Timo Walther
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The log contains errors:
> {code}
> 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - 
> Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb 
> with no CheckpointCoordinator
> {code}
>  
> See also: https://api.travis-ci.org/v3/job/459693461/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711288#comment-16711288
 ] 

ASF GitHub Bot commented on FLINK-11011:


tillrohrmann closed pull request #7224: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7224
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 40a675aca31..0b245e74ff4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -683,8 +683,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Elasticsearch 6 sink end-to-end test unstable
> -
>
> Key: FLINK-11011
> URL: https://issues.apache.org/jira/browse/FLINK-11011
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.8.0, 1.7.1
>Reporter: Timo Walther
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The log contains errors:
> {code}
> 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - 
> Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb 
> with no CheckpointCoordinator
> {code}
>  
> See also: https://api.travis-ci.org/v3/job/459693461/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7216: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running

2018-12-06 Thread GitBox
tillrohrmann closed pull request #7216: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7216
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 40a675aca31..0b245e74ff4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -683,8 +683,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #7224: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running

2018-12-06 Thread GitBox
tillrohrmann closed pull request #7224: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7224
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 40a675aca31..0b245e74ff4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -683,8 +683,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #7223: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running

2018-12-06 Thread GitBox
tillrohrmann closed pull request #7223: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7223
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c8a8e882913..a89079dea25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -681,8 +681,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -700,8 +704,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711285#comment-16711285
 ] 

ASF GitHub Bot commented on FLINK-11011:


tillrohrmann closed pull request #7222: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7222
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 269f23e6781..0b0b82baa95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -680,8 +680,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -699,8 +703,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Elasticsearch 6 sink end-to-end test unstable
> -
>
> Key: FLINK-11011
> URL: https://issues.apache.org/jira/browse/FLINK-11011
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.8.0, 1.7.1
>Reporter: Timo Walther
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The log contains errors:
> {code}
> 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - 
> Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb 
> with no CheckpointCoordinator
> {code}
>  
> See also: https://api.travis-ci.org/v3/job/459693461/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7222: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running

2018-12-06 Thread GitBox
tillrohrmann closed pull request #7222: [FLINK-11011][E2E][JM] Log error 
messages about null CheckpointCoordinator only if job is running
URL: https://github.com/apache/flink/pull/7222
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 269f23e6781..0b0b82baa95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -680,8 +680,12 @@ public void acknowledgeCheckpoint(
}
});
} else {
-   log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 
@@ -699,8 +703,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
});
} else {
-   log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
-   jobGraph.getJobID());
+   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
+   if (executionGraph.getState() == JobStatus.RUNNING) {
+   log.error(errorMessage, jobGraph.getJobID());
+   } else {
+   log.debug(errorMessage, jobGraph.getJobID());
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10997) Avro-confluent-registry does not bundle any dependency

2018-12-06 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10997.

   Resolution: Fixed
Fix Version/s: 1.7.1
   1.8.0
   1.6.3

master: 339ec175f558d23aaabffa3e856809518f238f2e

1.7: 5b945f73ea47364badb12ca0b9c45604b8028e73

1.6: 4b9b367ef1bb1ddff4bf015de2a15c561b134135

> Avro-confluent-registry does not bundle any dependency
> --
>
> Key: FLINK-10997
> URL: https://issues.apache.org/jira/browse/FLINK-10997
> Project: Flink
>  Issue Type: Bug
>  Components: Formats, Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> The {{flink-avro-confluent-registry}} is not bundling any dependencies, yet 
> defines a relocation for the transitive jackson dependency pulled in by 
> {{kafka-schema-registry-client}}.
> It is like that the registry-client should be included in the jar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711264#comment-16711264
 ] 

ASF GitHub Bot commented on FLINK-11048:


zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239406192
 
 

 ##
 File path: 
flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
 ##
 @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment(
this.flinkILoop = flinkILoop;
}
 
-   /**
-* Executes the remote job.
-*
-* @param streamGraph
-*Stream Graph to execute
-* @param jarFiles
-*List of jar file URLs to ship to the cluster
-* @return The result of the job execution, containing elapsed time and 
accumulators.
-*/
-   @Override
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   protected List getJarFiles() throws ProgramInvocationException {
 
 Review comment:
   missing `@Override`, also applies to other classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10997) Avro-confluent-registry does not bundle any dependency

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711269#comment-16711269
 ] 

ASF GitHub Bot commented on FLINK-10997:


zentol closed pull request #7170: [FLINK-10997][formats] Bundle 
kafka-scheme-registry-client
URL: https://github.com/apache/flink/pull/7170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml 
b/flink-formats/flink-avro-confluent-registry/pom.xml
index 72a74ac11ba..4c99f71e544 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -78,10 +78,31 @@ under the License.
shade


+   
false
+   
+   
+   
io.confluent:*
+   
com.fasterxml.jackson.core:*
+   
org.apache.zookeeper:zookeeper
+   
com.101tec:zkclient
+   
+   


-   
com.fasterxml.jackson.core
-   
org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core
+   
com.fasterxml.jackson
+   
org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson
+   
+   
+   
org.apache.zookeeper
+   
org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper
+   
+   
+   
org.apache.jute
+   
org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.jute
+   
+   
+   
org.I0Itec.zkclient
+   
org.apache.flink.formats.avro.registry.confluent.shaded.org.101tec



diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE
 
b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000..24f2ff84657
--- /dev/null
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,15 @@
+flink-avro-confluent-registry
+Copyright 2014-2018 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) 
+
+- com.101tec:zkclient:0.10
+- com.fasterxml.jackson.core:jackson-databind:2.8.4
+- com.fasterxml.jackson.core:jackson-annotations:2.8.0
+- com.fasterxml.jackson.core:jackson-core:2.8.4
+- io.confluent:common-utils:3.3.1
+- io.confluent:kafka-schema-registry-client:3.3.1
+- org.apache.zookeeper:zookeeper:3.4.10


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[GitHub] zentol closed pull request #7170: [FLINK-10997][formats] Bundle kafka-scheme-registry-client

2018-12-06 Thread GitBox
zentol closed pull request #7170: [FLINK-10997][formats] Bundle 
kafka-scheme-registry-client
URL: https://github.com/apache/flink/pull/7170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml 
b/flink-formats/flink-avro-confluent-registry/pom.xml
index 72a74ac11ba..4c99f71e544 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -78,10 +78,31 @@ under the License.
shade


+   
false
+   
+   
+   
io.confluent:*
+   
com.fasterxml.jackson.core:*
+   
org.apache.zookeeper:zookeeper
+   
com.101tec:zkclient
+   
+   


-   
com.fasterxml.jackson.core
-   
org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core
+   
com.fasterxml.jackson
+   
org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson
+   
+   
+   
org.apache.zookeeper
+   
org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper
+   
+   
+   
org.apache.jute
+   
org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.jute
+   
+   
+   
org.I0Itec.zkclient
+   
org.apache.flink.formats.avro.registry.confluent.shaded.org.101tec



diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE
 
b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000..24f2ff84657
--- /dev/null
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,15 @@
+flink-avro-confluent-registry
+Copyright 2014-2018 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) 
+
+- com.101tec:zkclient:0.10
+- com.fasterxml.jackson.core:jackson-databind:2.8.4
+- com.fasterxml.jackson.core:jackson-annotations:2.8.0
+- com.fasterxml.jackson.core:jackson-core:2.8.4
+- io.confluent:common-utils:3.3.1
+- io.confluent:kafka-schema-registry-client:3.3.1
+- org.apache.zookeeper:zookeeper:3.4.10


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711265#comment-16711265
 ] 

ASF GitHub Bot commented on FLINK-11048:


zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239406325
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -262,4 +281,5 @@ public int getPort() {
public Configuration getClientConfiguration() {
return clientConfiguration;
}
+
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239406325
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -262,4 +281,5 @@ public int getPort() {
public Configuration getClientConfiguration() {
return clientConfiguration;
}
+
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10973) Add Map operator to Table API

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711263#comment-16711263
 ] 

ASF GitHub Bot commented on FLINK-10973:


twalthr commented on issue #7167: [FLINK-10973] [table] Add support for map to 
table API
URL: https://github.com/apache/flink/pull/7167#issuecomment-444829802
 
 
   Thanks for the reminder @dianfu. I will try to add my feedback until 
tomorrow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Map operator to Table API
> -
>
> Key: FLINK-10973
> URL: https://issues.apache.org/jira/browse/FLINK-10973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Map operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]
> The usage:
> {code:java}
> val res = tab
>.map(fun: ScalarFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239406192
 
 

 ##
 File path: 
flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
 ##
 @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment(
this.flinkILoop = flinkILoop;
}
 
-   /**
-* Executes the remote job.
-*
-* @param streamGraph
-*Stream Graph to execute
-* @param jarFiles
-*List of jar file URLs to ship to the cluster
-* @return The result of the job execution, containing elapsed time and 
accumulators.
-*/
-   @Override
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   protected List getJarFiles() throws ProgramInvocationException {
 
 Review comment:
   missing `@Override`, also applies to other classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on issue #7167: [FLINK-10973] [table] Add support for map to table API

2018-12-06 Thread GitBox
twalthr commented on issue #7167: [FLINK-10973] [table] Add support for map to 
table API
URL: https://github.com/apache/flink/pull/7167#issuecomment-444829802
 
 
   Thanks for the reminder @dianfu. I will try to add my feedback until 
tomorrow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711262#comment-16711262
 ] 

ASF GitHub Bot commented on FLINK-11048:


zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239405800
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   This goes back a bit to the discussion on the ML. Remote execution shouldn't 
be an intrinsic part of the environment, as regardless of which environment 
you're using it is still possible to submit it remotely provided you have the 
host/port arguments.
   
   With how this PR sets things up you don't have to change your 
`getExecutionEnvironment` call every time you want to change the submission 
method; instead you only replace the `execute()` call which imo makes a lot 
more sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-06 Thread GitBox
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r239405800
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   This goes back a bit to the discussion on the ML. Remote execution shouldn't 
be an intrinsic part of the environment, as regardless of which environment 
you're using it is still possible to submit it remotely provided you have the 
host/port arguments.
   
   With how this PR sets things up you don't have to change your 
`getExecutionEnvironment` call every time you want to change the submission 
method; instead you only replace the `execute()` call which imo makes a lot 
more sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys closed pull request #7212: [hotfix][docs] Fix invalid link in schema_evolution doc

2018-12-06 Thread GitBox
dawidwys closed pull request #7212: [hotfix][docs] Fix invalid link in 
schema_evolution doc
URL: https://github.com/apache/flink/pull/7212
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/schema_evolution.md 
b/docs/dev/stream/state/schema_evolution.md
index 2fb10a74ff1..e4c2d4aa441 100644
--- a/docs/dev/stream/state/schema_evolution.md
+++ b/docs/dev/stream/state/schema_evolution.md
@@ -74,7 +74,7 @@ serialization schema than the previous serializer; if so, the 
previous serialize
 and written back to bytes again with the new serializer.
 
 Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization).
+[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
 
 ## Supported data types for schema evolution
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11036) Streaming classloader end-to-end test does not work on source releases

2018-12-06 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-11036:
--
Labels: test-stability  (was: )

> Streaming classloader end-to-end test does not work on source releases
> --
>
> Key: FLINK-11036
> URL: https://issues.apache.org/jira/browse/FLINK-11036
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> A Flink distribution that has been built from a source release has no git 
> commit id associated. The web UI shows {{unkown}} as the version commit. 
> Therefore, the {{test_streaming_classloader.sh}} can not be executed on a 
> source release. Either we change the test setup or we skip the test if the 
> commit id is not present.
> {code}
> ==
> Running 'class loading end-to-end test'
> ==
> TEST_DATA_DIR: 
> /Users/twalthr/flink/tests/flink1.7/flink-1.7.0/flink-end-to-end-tests/test-scripts/temp-test-directory-30N
> Flink dist directory: 
> /Users/twalthr/flink/tests/flink1.7/flink-1.7.0/flink-dist/target/flink-1.7.0-bin/flink-1.7.0
> Testing parent-first class loading
> Starting cluster.
> Starting standalonesession daemon on host TTMACBOOK.
> Starting taskexecutor daemon on host TTMACBOOK.
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Waiting for dispatcher REST endpoint to come up...
> Dispatcher REST endpoint is up.
> Starting execution of program
> Program execution finished
> Job with JobID d69f02237884c6119699302497586dbf has finished.
> Job Runtime: 334 ms
> Stopping taskexecutor daemon (pid: 1598) on host TTMACBOOK.
> Stopping standalonesession daemon (pid: 1180) on host TTMACBOOK.
> Output from Flink program does not match expected output.
> EXPECTED: NoSuchMethodError:[0-9a-f]{6,}:[0-9a-f]{6,}hello-there-42
> ACTUAL: NoSuchMethodError:hello-there-42:hello-there-42
> [FAIL] Test script contains errors.
> Checking for errors...
> No errors in log files.
> Checking for exceptions...
> No exceptions in log files.
> Checking for non-empty .out files...
> No non-empty .out files.
> [FAIL] 'class loading end-to-end test' failed after 0 minutes and 9 seconds! 
> Test exited with exit code 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query

2018-12-06 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711232#comment-16711232
 ] 

Timo Walther commented on FLINK-10964:
--

I can give more information here. The issue is rather a conceptual problem. 
Currently, we only maintain results until a job has finished. If the job is 
bounded (for batch or streaming with bounded sources), the result store is 
cleaned up even though the CLI still wants to page through the result. A 
solution would be:
- keep results in {{ResultStore}} even if query has been stopped
- delete result when {{Executor#cancelQuery}} is called
- or by a timeout that has been specified using the "execution" properties 
(meaning we need a timer service in the result store).

> sql-client throws exception when paging through finished batch query 
> -
>
> Key: FLINK-10964
> URL: https://issues.apache.org/jira/browse/FLINK-10964
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: Seth Wiesman
>Assignee: vinoyang
>Priority: Major
>
> When paging through a batch query in state 'Finished' the sql client throws 
> the following exception: 
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a 
> result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3   >