[jira] [Assigned] (FLINK-10562) Relax (or document) table name constraints

2018-10-17 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-10562:


Assignee: winifredtang  (was: vinoyang)

> Relax (or document) table name constraints
> --
>
> Key: FLINK-10562
> URL: https://issues.apache.org/jira/browse/FLINK-10562
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.1
>Reporter: Flavio Pompermaier
>Assignee: winifredtang
>Priority: Minor
>
> At the moment it's not possible to register a table whose name starts with a 
> number (e.g. 1_test). Moreover this constraint is not reported in the 
> documentation.
> I propose to enable table name escaping somehow in order to enable more 
> general scenarios like those having spaces in between (e.g. select * from 'my 
> table' ).
> Best,
> Flavio
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654708#comment-16654708
 ] 

ASF GitHub Bot commented on FLINK-10511:


asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the 
port selection and RPC se…
URL: https://github.com/apache/flink/pull/6845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f528bc4bcc4..cc12ff5ee5c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,7 +33,6 @@
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -50,7 +49,7 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -83,10 +82,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -252,7 +247,7 @@ protected void initializeServices(Configuration 
configuration) throws Exception
final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
 
-   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
+   commonRpcService = 
AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
 
// update the configuration used to create the high 
availability services
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
@@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration 
configuration) {
}
}
 
-   protected RpcService createRpcService(
-   Configuration configuration,
-   String bindAddress,
-   String portRange) throws Exception {
-   ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
-   FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-   return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
-   }
-
protected HighAvailabilityServices createHaServices(
Configuration configuration,
Executor executor) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..28f04f7677d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -30,8 +31,6 @@
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.jboss.netty.channel.ChannelExcep

[GitHub] asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se…

2018-10-17 Thread GitBox
asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the 
port selection and RPC se…
URL: https://github.com/apache/flink/pull/6845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f528bc4bcc4..cc12ff5ee5c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,7 +33,6 @@
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -50,7 +49,7 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -83,10 +82,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -252,7 +247,7 @@ protected void initializeServices(Configuration 
configuration) throws Exception
final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
 
-   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
+   commonRpcService = 
AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
 
// update the configuration used to create the high 
availability services
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
@@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration 
configuration) {
}
}
 
-   protected RpcService createRpcService(
-   Configuration configuration,
-   String bindAddress,
-   String portRange) throws Exception {
-   ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
-   FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-   return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
-   }
-
protected HighAvailabilityServices createHaServices(
Configuration configuration,
Executor executor) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..28f04f7677d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -30,8 +31,6 @@
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,25 @@
//  RPC instantiation
// 

 
+   /**
+* Utility method to create RPC servi

[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges

2018-10-17 Thread GitBox
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-430892698
 
 
   @zentol About the `MetricRegistryConfiguration` I have reverted, please 
review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils

2018-10-17 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10511.
---
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed via d5aa97f8a0e653125d97846d67f3fdf6a10dbedf

> Code duplication of creating RPC service in ClusterEntrypoint and 
> AkkaRpcServiceUtils
> -
>
> Key: FLINK-10511
> URL: https://issues.apache.org/jira/browse/FLINK-10511
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but 
> the ClusterEntrypoint use a protected method to do the same job. I think it's 
> better to use the same method in AkkaRpcServiceUtils for reuse of code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654704#comment-16654704
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-430892698
 
 
   @zentol About the `MetricRegistryConfiguration` I have reverted, please 
review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654700#comment-16654700
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

twalthr commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225604789
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -58,7 +62,10 @@ class PatternSelectFunctionRunner(
 }
 
 outCRow.row = function.select(pattern)
-outCRow
+out match {
 
 Review comment:
   IMHO I would use a instanceof check here instead of a match. We should avoid 
calling Scala code in runtime code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support the basic functionality of MATCH_RECOGNIZE
> --
>
> Key: FLINK-7062
> URL: https://issues.apache.org/jira/browse/FLINK-7062
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} 
> in Flink SQL API which includes the support of syntax {{MEASURES}}, 
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases 
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
>   MEASURES
> A.id AS aid,
> B.id AS bid,
> C.id AS cid
>   PATTERN (A B C)
>   DEFINE
> A AS A.name = 'a',
> B AS B.name = 'b',
> C AS C.name = 'c'
> ) AS T
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

2018-10-17 Thread GitBox
twalthr commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225604789
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -58,7 +62,10 @@ class PatternSelectFunctionRunner(
 }
 
 outCRow.row = function.select(pattern)
-outCRow
+out match {
 
 Review comment:
   IMHO I would use a instanceof check here instead of a match. We should avoid 
calling Scala code in runtime code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10555) Port AkkaSslITCase to new code base

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654698#comment-16654698
 ] 

ASF GitHub Bot commented on FLINK-10555:


TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to 
new code base
URL: https://github.com/apache/flink/pull/6849#issuecomment-430890279
 
 
   @tillrohrmann yes I agree with moving the `BlobServer` SSL specific failures 
could go into a `BlobServerSSLTest`.
   
   For the second part of your comment, could you show me what "interact with 
it" means or say which "interact"s will be filtered by SSL configuration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AkkaSslITCase to new code base
> ---
>
> Key: FLINK-10555
> URL: https://issues.apache.org/jira/browse/FLINK-10555
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base

2018-10-17 Thread GitBox
TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to 
new code base
URL: https://github.com/apache/flink/pull/6849#issuecomment-430890279
 
 
   @tillrohrmann yes I agree with moving the `BlobServer` SSL specific failures 
could go into a `BlobServerSSLTest`.
   
   For the second part of your comment, could you show me what "interact with 
it" means or say which "interact"s will be filtered by SSL configuration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654688#comment-16654688
 ] 

ASF GitHub Bot commented on FLINK-10319:


TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-430886993
 
 
   As "deploying tasks in topological order", I agree that it could help. It is 
a orthonormal improvement though.
   
   For your hesitancy, I'd like to learn in which situation that a downstream 
operator would not be failed by a upstream failing. To keep the state clean 
either the upstream fails downstream and both restore from the least 
checkpoint, or we need to implement a failover strategy that take the 
responsibility for reconcile the state. The latter sounds quite costly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Too many requestPartitionState would crash JM
> -
>
> Key: FLINK-10319
> URL: https://issues.apache.org/jira/browse/FLINK-10319
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Do not requestPartitionState from JM on partition request fail, which may 
> generate too many RPC requests and block JM.
> We gain little benefit to check what state producer is in, which in the other 
> hand crash JM by too many RPC requests. Task could always 
> retriggerPartitionRequest from its InputGate, it would be fail if the 
> producer has gone and succeed if the producer alive. Anyway, no need to ask 
> for JM for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM

2018-10-17 Thread GitBox
TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-430886993
 
 
   As "deploying tasks in topological order", I agree that it could help. It is 
a orthonormal improvement though.
   
   For your hesitancy, I'd like to learn in which situation that a downstream 
operator would not be failed by a upstream failing. To keep the state clean 
either the upstream fails downstream and both restore from the least 
checkpoint, or we need to implement a failover strategy that take the 
responsibility for reconcile the state. The latter sounds quite costly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10577) CEP's greedy() doesn't work

2018-10-17 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654571#comment-16654571
 ] 

Dian Fu commented on FLINK-10577:
-

Hi [~simahoa], this is currently by design. For pattern (start middle+), when 
an event matching pattern `middle` comes, it will output `start middle` as we 
do not know if there will be any more events matching the pattern `middle` in 
stream and when another event matching `middle` comes, it will output `start 
middle middle` which can be seen an update of the previous sent result `start 
middle`.

> CEP's greedy() doesn't work
> ---
>
> Key: FLINK-10577
> URL: https://issues.apache.org/jira/browse/FLINK-10577
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: zhanghao
>Priority: Major
>
> I think greedy operator has some problem.
> Given the below java code:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream> input = env.fromElements(
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:00").getTime(), "r"),
> Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 
> 13:00:00").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:01").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:03").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:04").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:05").getTime(), "c"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:08").getTime(), "c"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 
> 13:00:11").getTime(), "a")
> ).assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor String>>(Time.seconds(2)) {
> private static final long serialVersionUID = 1L;
> @Override
> public long extractTimestamp(Tuple3 element) {
> return element.f1; 
> }
> });
> AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
> Pattern, ?> pattern = Pattern. Long, String>>begin("start", strategy)
> .where(new SimpleCondition>() {
> private static final long serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple3 e) {
> return e.f2.equals("r") ? true : false;
> }
> }).followedBy("middle").where(new SimpleCondition String>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple3 e) throws Exception {
> return !e.f2.equals("r") ? true : false;
> }
> }).oneOrMore().greedy()
> .within(Time.seconds(10));
> CEP.pattern(input.keyBy(0), pattern)
> .select(new PatternSelectFunction, String>() {
> private static final long serialVersionUID = 1L;
> @Override
> public String select(Map>> 
> pattern) {
> StringBuilder builder = new StringBuilder();
> List> start = pattern.get("start");
> List> middle = pattern.get("middle");
> for (Tuple3 t : start) {
> builder.append(t.f0).append(",");
> }
> for (Tuple3 t : middle) {
> builder.append(t.f0).append(",");
> }
> return builder.toString(); 
> }
> })
> .print(); 
> env.execute();{code}
> I would like to see:100,100,100,100,100,100
> however it matches 100,100
> I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping 
> some partial matches,it also matches 100,100.
> Is there something important about greedy operator that i misunderstand?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654559#comment-16654559
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r226155097
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName, primitiveDefaultValue}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Collector
+import org.apache.flink.util.MathUtils.checkedDownCast
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A code generator for generating CEP related functions.
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input type information about the first input of the Function
+  * @param currentPattern if generating condition the name of pattern, which 
the condition will
+  *   be applied to
+  */
+class MatchCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+currentPattern: Option[String] = None)
+  extends CodeGenerator(config, nullableInput, input){
+
+  def generateMatchFunction[F <: Function, T <: Any](
+name: String,
+clazz: Class[F],
+bodyCode: String,
+returnType: TypeInformation[T])
+  : GeneratedFunction[F, T] = {
+// This is a separate method from FunctionCodeGenerator#generateFunction 
because as of now
+// functions in CEP library do not support rich interfaces
+val funcName = newName(name)
+val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
+val (functionClass, signature, inputStatements, isInterface) =
+  if (clazz == classOf[IterativeCondition[_]]) {
+val baseClass = classOf[IterativeCondition[_]]
+val inputTypeTerm = boxedTypeTermForTypeInfo(input)
+val contextType = 
classOf[IterativeCondition.Context[_]].getCanonicalName
+
+(baseClass,
+  s"boolean filter( Object _in1, $contextType $contextTerm)",
+  List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"),
+  false)
+  } else if (clazz == classOf[PatternSelectFunction[_, _]]) {
+val baseClass = classOf[PatternSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"Object select($inputTypeTerm $input1Term)",
+  List(),
+  true)
+  } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) {
+val baseClass = classOf[PatternFlatSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm 
$collectorTerm)",
+  List(),
+  true)
+  } else {
+throw new CodeGenException("Unsupported Function.")
+  }
+
+val extendsKeyword = if (isInterface) "implements" else "extends"
+val funcCode = j"""
+  |public class $funcName $extendsKeyword 
${functionClass.getCanonicalName} {
+  |
+  |  ${reuseMemberCode()}
+  |
+  |  public $funcName() throws Exception {
+  |${reuseInitCode()}
+  | 

[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

2018-10-17 Thread GitBox
dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r226155097
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName, primitiveDefaultValue}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Collector
+import org.apache.flink.util.MathUtils.checkedDownCast
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A code generator for generating CEP related functions.
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input type information about the first input of the Function
+  * @param currentPattern if generating condition the name of pattern, which 
the condition will
+  *   be applied to
+  */
+class MatchCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+currentPattern: Option[String] = None)
+  extends CodeGenerator(config, nullableInput, input){
+
+  def generateMatchFunction[F <: Function, T <: Any](
+name: String,
+clazz: Class[F],
+bodyCode: String,
+returnType: TypeInformation[T])
+  : GeneratedFunction[F, T] = {
+// This is a separate method from FunctionCodeGenerator#generateFunction 
because as of now
+// functions in CEP library do not support rich interfaces
+val funcName = newName(name)
+val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
+val (functionClass, signature, inputStatements, isInterface) =
+  if (clazz == classOf[IterativeCondition[_]]) {
+val baseClass = classOf[IterativeCondition[_]]
+val inputTypeTerm = boxedTypeTermForTypeInfo(input)
+val contextType = 
classOf[IterativeCondition.Context[_]].getCanonicalName
+
+(baseClass,
+  s"boolean filter( Object _in1, $contextType $contextTerm)",
+  List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"),
+  false)
+  } else if (clazz == classOf[PatternSelectFunction[_, _]]) {
+val baseClass = classOf[PatternSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"Object select($inputTypeTerm $input1Term)",
+  List(),
+  true)
+  } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) {
+val baseClass = classOf[PatternFlatSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm 
$collectorTerm)",
+  List(),
+  true)
+  } else {
+throw new CodeGenException("Unsupported Function.")
+  }
+
+val extendsKeyword = if (isInterface) "implements" else "extends"
+val funcCode = j"""
+  |public class $funcName $extendsKeyword 
${functionClass.getCanonicalName} {
+  |
+  |  ${reuseMemberCode()}
+  |
+  |  public $funcName() throws Exception {
+  |${reuseInitCode()}
+  |  }
+  |
+  |  @Override
+  |  public $signature throws Exception {
+  |${inputStatements.mkString("\n")}
+  |${reusePerRecordCode()}
+  |${reuseInputUnboxingCode()}
+  |$bodyCode
+  |  }
+  |}
+""".str

[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654396#comment-16654396
 ] 

ASF GitHub Bot commented on FLINK-10436:


TisonKun opened a new pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872
 
 
   ## What is the purpose of the change
   
   Add `ConfigOption#withFallbackKeys` to implement a similar function with 
`ConfigOption#withDeprecatedKeys`. That is, the `fallbackKey` falls back to 
current key if it has not been specified, but the use of current key would not 
print a deprecated message.
   
   ## Brief change log
   
   - Introduce `FallbackKey` class to represent a key with `isDeprecated` 
message.
   - Replace `deprecatedKeys` to `fallbackKeys` in `ConfigOption`
   
   
   ## Verifying this change
   
   - Add a test `ConfigurationTest#testFallbackKeys` to check the fallback 
mechanism works.
   - Manually check the issue report by the corresponding JIRA disappear.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-17 Thread GitBox
TisonKun opened a new pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872
 
 
   ## What is the purpose of the change
   
   Add `ConfigOption#withFallbackKeys` to implement a similar function with 
`ConfigOption#withDeprecatedKeys`. That is, the `fallbackKey` falls back to 
current key if it has not been specified, but the use of current key would not 
print a deprecated message.
   
   ## Brief change log
   
   - Introduce `FallbackKey` class to represent a key with `isDeprecated` 
message.
   - Replace `deprecatedKeys` to `fallbackKeys` in `ConfigOption`
   
   
   ## Verifying this change
   
   - Add a test `ConfigurationTest#testFallbackKeys` to check the fallback 
mechanism works.
   - Manually check the issue report by the corresponding JIRA disappear.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #6861: [hotfix] [javadocs] Fix some single quote problems

2018-10-17 Thread GitBox
zentol closed pull request #6861: [hotfix] [javadocs] Fix some single quote 
problems
URL: https://github.com/apache/flink/pull/6861
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/akka_configuration.html 
b/docs/_includes/generated/akka_configuration.html
index f5a2a5d77f9..787210119df 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -60,7 +60,7 @@
 
 akka.log.lifecycle.events
 false
-Turns on the Akka’s remote logging of events. Set this value 
to ‘true’ in case of debugging.
+Turns on the Akka’s remote logging of events. Set this value 
to 'true' in case of debugging.
 
 
 akka.lookup.timeout
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
index 205e4372cd8..563f095ac01 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
@@ -35,7 +35,7 @@
  * This is an example showing the to use the Cassandra Input-/OutputFormats in 
the Batch API.
  *
  * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
- * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': '1'};
  * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
  */
 public class BatchExample {
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
index 8b4e4b125cd..f32c2afb353 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
@@ -35,7 +35,7 @@
  * This is an example showing the to use the {@link 
CassandraPojoInputFormat}/{@link CassandraTupleOutputFormat} in the Batch API.
  *
  * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
- * CREATE KEYSPACE IF NOT EXISTS flink WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE KEYSPACE IF NOT EXISTS flink WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': '1'};
  * CREATE TABLE IF NOT EXISTS flink.batches (id text, counter int, batch_id 
int, PRIMARY KEY(id, counter, batchId));
  */
 public class BatchPojoExample {
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
index 89e2d9e3ccf..6ee615db280 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -34,7 +34,7 @@
  * Pojo's have to be annotated with datastax annotations to work with this 
sink.
  *
  * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
- * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': '1'};
  * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
  */
 public class CassandraPojoSinkExample {
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
index 72013d5141a..82daae18c16 100644
--- 

[GitHub] zentol commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-17 Thread GitBox
zentol commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 
filesystem under "s3p" scheme
URL: https://github.com/apache/flink/pull/6855#issuecomment-430771951
 
 
   the shading check must be updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654138#comment-16654138
 ] 

ASF GitHub Bot commented on FLINK-10563:


zentol commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 
filesystem under "s3p" scheme
URL: https://github.com/apache/flink/pull/6855#issuecomment-430771951
 
 
   the shading check must be updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expose shaded Presto S3 filesystem under "s3p" scheme
> -
>
> Key: FLINK-10563
> URL: https://issues.apache.org/jira/browse/FLINK-10563
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 
> filesystem at the same time. If we exposed the presto filesystem under an 
> additional scheme we enable using both at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10586) Running RestServerEndpointITCase may cause Kernel Panic

2018-10-17 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-10586:
-
Summary: Running RestServerEndpointITCase may cause Kernel Panic  (was: 
Running {{RestServerEndpointITCase}} may cause Kernel Panic)

> Running RestServerEndpointITCase may cause Kernel Panic
> ---
>
> Key: FLINK-10586
> URL: https://issues.apache.org/jira/browse/FLINK-10586
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Tests
>Affects Versions: 1.6.1, 1.7.0
> Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
> *OS:* macOS High Sierra 10.13.6 (17G65)
> *Maven:* 3.2.5
> *Java:*
> openjdk version "1.8.0_181"
> OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
> OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: loop_test_kernel_panic.patch
>
>
> Running the tests in {{RestServerEndpointITCase}} in a loop may cause a 
> Kernel panic on OS X. I have observed this to happen with tests that use the 
> {{OkHttpClient}}, such as {{testDefaultVersionRouting}}.
> To run the aforementioned test in a loop, apply the git patch in the 
> attachment, and execute the following command:
> {code}
> mvn clean integration-test -pl flink-runtime -am 
> -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
> -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
> -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
> {code}
> The test eventually fails with the exception below. A few moments later the 
> operation system restarts.
> {noformat}
> 
> 16:43:17,421 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase 
>-
> 
> Test 
> testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase)
>  is running.
> 
> 16:43:17,422 WARN  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Upload directory 
> /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload
>  does not exist, or has been deleted externally. Previously uploaded files 
> are no longer available.
> 16:43:17,422 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Created directory 
> /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload
>  for file uploads.
> 16:43:17,422 INFO  org.apache.flink.runtime.rest.RestClient   
>- Rest client endpoint started.
> 16:43:17,422 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Starting rest endpoint.
> 16:43:17,423 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Rest endpoint listening at localhost:57561
> 16:43:17,429 INFO  org.apache.flink.runtime.rest.RestClient   
>- Shutting down rest endpoint.
> 16:43:17,431 INFO  org.apache.flink.runtime.rest.RestClient   
>- Rest endpoint shutdown complete.
> 16:43:17,431 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Shutting down rest endpoint.
> 16:43:17,435 ERROR org.apache.flink.runtime.rest.RestServerEndpointITCase 
>-
> 
> Test 
> testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase)
>  failed with:
> java.lang.AssertionError: Bad file descriptor (Write failed)
>   at 
> org.apache.flink.runtime.rest.RestServerEndpointITCase.testDefaultVersionRouting(RestServerEndpointITCase.java:260)
>   at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.e

[jira] [Created] (FLINK-10586) Running {{RestServerEndpointITCase}} may cause Kernel Panic

2018-10-17 Thread Gary Yao (JIRA)
Gary Yao created FLINK-10586:


 Summary: Running {{RestServerEndpointITCase}} may cause Kernel 
Panic
 Key: FLINK-10586
 URL: https://issues.apache.org/jira/browse/FLINK-10586
 Project: Flink
  Issue Type: Bug
  Components: REST, Tests
Affects Versions: 1.6.1, 1.7.0
 Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
*OS:* macOS High Sierra 10.13.6 (17G65)
*Maven:* 3.2.5
*Java:*
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)
Reporter: Gary Yao
 Fix For: 1.8.0
 Attachments: loop_test_kernel_panic.patch

Running the tests in {{RestServerEndpointITCase}} in a loop may cause a Kernel 
panic on OS X. I have observed this to happen with tests that use the 
{{OkHttpClient}}, such as {{testDefaultVersionRouting}}.
To run the aforementioned test in a loop, apply the git patch in the 
attachment, and execute the following command:
{code}
mvn clean integration-test -pl flink-runtime -am 
-Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
-Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
-Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
{code}

The test eventually fails with the exception below. A few moments later the 
operation system restarts.

{noformat}

16:43:17,421 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase   
 -

Test 
testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase)
 is running.

16:43:17,422 WARN  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Upload directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload
 does not exist, or has been deleted externally. Previously uploaded files are 
no longer available.
16:43:17,422 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Created directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload
 for file uploads.
16:43:17,422 INFO  org.apache.flink.runtime.rest.RestClient 
 - Rest client endpoint started.
16:43:17,422 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Starting rest endpoint.
16:43:17,423 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Rest endpoint listening at localhost:57561
16:43:17,429 INFO  org.apache.flink.runtime.rest.RestClient 
 - Shutting down rest endpoint.
16:43:17,431 INFO  org.apache.flink.runtime.rest.RestClient 
 - Rest endpoint shutdown complete.
16:43:17,431 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Shutting down rest endpoint.
16:43:17,435 ERROR org.apache.flink.runtime.rest.RestServerEndpointITCase   
 -

Test 
testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase)
 failed with:
java.lang.AssertionError: Bad file descriptor (Write failed)
at 
org.apache.flink.runtime.rest.RestServerEndpointITCase.testDefaultVersionRouting(RestServerEndpointITCase.java:260)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUni

[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654030#comment-16654030
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r226050176
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -90,7 +92,9 @@
/**
 * Creates a new MetricRegistry and starts the configured reporter.
 */
-   public MetricRegistryImpl(MetricRegistryConfiguration config) {
+   public MetricRegistryImpl(Configuration configuration) {
+   MetricRegistryConfiguration config = 
MetricRegistryConfiguration.fromConfiguration(configuration);
 
 Review comment:
   the whole point of the `MetricRegistryConfiguration` is to not expose the 
`Configuration` directly to the `MetricRegistry`. This should be reverted. 
Instead, extend the MRConfiguration with a separate configuration object for 
the akka stuff that the `MetricQueryService` requires, which btw. can be 
determined before the actor is even started.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-17 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r226050176
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -90,7 +92,9 @@
/**
 * Creates a new MetricRegistry and starts the configured reporter.
 */
-   public MetricRegistryImpl(MetricRegistryConfiguration config) {
+   public MetricRegistryImpl(Configuration configuration) {
+   MetricRegistryConfiguration config = 
MetricRegistryConfiguration.fromConfiguration(configuration);
 
 Review comment:
   the whole point of the `MetricRegistryConfiguration` is to not expose the 
`Configuration` directly to the `MetricRegistry`. This should be reverted. 
Instead, extend the MRConfiguration with a separate configuration object for 
the akka stuff that the `MetricQueryService` requires, which btw. can be 
determined before the actor is even started.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653931#comment-16653931
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-430720238
 
 
   Fine with me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-17 Thread GitBox
alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-430720238
 
 
   Fine with me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10579) Remove unused deploysettings.xml

2018-10-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10579.

Resolution: Fixed

master: 812b84c332d2e23c5a9930c81f2a3002fe2e9dd2

> Remove unused deploysettings.xml
> 
>
> Key: FLINK-10579
> URL: https://issues.apache.org/jira/browse/FLINK-10579
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be 
> removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10579) Remove unused deploysettings.xml

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653825#comment-16653825
 ] 

ASF GitHub Bot commented on FLINK-10579:


zentol closed pull request #6865: [FLINK-10579][build] Remove unused 
deploysettings.xml
URL: https://github.com/apache/flink/pull/6865
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/deploysettings.xml b/deploysettings.xml
deleted file mode 100644
index e36d8480d47..000
--- a/deploysettings.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-
-
-http://maven.apache.org/SETTINGS/1.0.0";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
-  http://maven.apache.org/xsd/settings-1.0.0.xsd";>
-  
-
-  apache.snapshots.https
-  ${sonatype_user}
-  ${sonatype_pw}
-
-
-  apache.releases.https
-  ${sonatype_user}
-  ${sonatype_pw}
-
-  
-
-
diff --git a/tools/releasing/create_source_release.sh 
b/tools/releasing/create_source_release.sh
index 47b21768614..fd1b13cf41b 100755
--- a/tools/releasing/create_source_release.sh
+++ b/tools/releasing/create_source_release.sh
@@ -63,7 +63,7 @@ cd ${CLONE_DIR}
 
 rsync -a \
   --exclude ".git" --exclude ".gitignore" --exclude ".gitattributes" --exclude 
".travis.yml" \
-  --exclude "deploysettings.xml" --exclude "CHANGELOG" --exclude ".github" 
--exclude "target" \
+  --exclude "CHANGELOG" --exclude ".github" --exclude "target" \
   --exclude ".idea" --exclude "*.iml" --exclude ".DS_Store" --exclude 
"build-target" \
   --exclude "docs/content" --exclude ".rubydeps" \
   . flink-$RELEASE_VERSION


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove unused deploysettings.xml
> 
>
> Key: FLINK-10579
> URL: https://issues.apache.org/jira/browse/FLINK-10579
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be 
> removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #6865: [FLINK-10579][build] Remove unused deploysettings.xml

2018-10-17 Thread GitBox
zentol closed pull request #6865: [FLINK-10579][build] Remove unused 
deploysettings.xml
URL: https://github.com/apache/flink/pull/6865
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/deploysettings.xml b/deploysettings.xml
deleted file mode 100644
index e36d8480d47..000
--- a/deploysettings.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-
-
-http://maven.apache.org/SETTINGS/1.0.0";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
-  http://maven.apache.org/xsd/settings-1.0.0.xsd";>
-  
-
-  apache.snapshots.https
-  ${sonatype_user}
-  ${sonatype_pw}
-
-
-  apache.releases.https
-  ${sonatype_user}
-  ${sonatype_pw}
-
-  
-
-
diff --git a/tools/releasing/create_source_release.sh 
b/tools/releasing/create_source_release.sh
index 47b21768614..fd1b13cf41b 100755
--- a/tools/releasing/create_source_release.sh
+++ b/tools/releasing/create_source_release.sh
@@ -63,7 +63,7 @@ cd ${CLONE_DIR}
 
 rsync -a \
   --exclude ".git" --exclude ".gitignore" --exclude ".gitattributes" --exclude 
".travis.yml" \
-  --exclude "deploysettings.xml" --exclude "CHANGELOG" --exclude ".github" 
--exclude "target" \
+  --exclude "CHANGELOG" --exclude ".github" --exclude "target" \
   --exclude ".idea" --exclude "*.iml" --exclude ".DS_Store" --exclude 
"build-target" \
   --exclude "docs/content" --exclude ".rubydeps" \
   . flink-$RELEASE_VERSION


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10579) Remove unused deploysettings.xml

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653747#comment-16653747
 ] 

ASF GitHub Bot commented on FLINK-10579:


StefanRRichter commented on issue #6865: [FLINK-10579][build] Remove unused 
deploysettings.xml
URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849
 
 
   LGTM 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove unused deploysettings.xml
> 
>
> Key: FLINK-10579
> URL: https://issues.apache.org/jira/browse/FLINK-10579
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be 
> removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10579) Remove unused deploysettings.xml

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653748#comment-16653748
 ] 

ASF GitHub Bot commented on FLINK-10579:


StefanRRichter removed a comment on issue #6865: [FLINK-10579][build] Remove 
unused deploysettings.xml
URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849
 
 
   LGTM 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove unused deploysettings.xml
> 
>
> Key: FLINK-10579
> URL: https://issues.apache.org/jira/browse/FLINK-10579
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be 
> removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter removed a comment on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml

2018-10-17 Thread GitBox
StefanRRichter removed a comment on issue #6865: [FLINK-10579][build] Remove 
unused deploysettings.xml
URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849
 
 
   LGTM 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml

2018-10-17 Thread GitBox
StefanRRichter commented on issue #6865: [FLINK-10579][build] Remove unused 
deploysettings.xml
URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849
 
 
   LGTM 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10567) Lost serialize fields when ttl state store with the mutable serializer

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653741#comment-16653741
 ] 

ASF GitHub Bot commented on FLINK-10567:


StefanRRichter commented on a change in pull request #6860: [FLINK-10567]Fix 
TtlStateSerializer lost field during duplicate()
URL: https://github.com/apache/flink/pull/6860#discussion_r225984090
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -224,7 +224,7 @@ protected Object getField(@Nonnull TtlValue v, int 
index) {
TypeSerializer ... originalSerializers) {
Preconditions.checkNotNull(originalSerializers);
Preconditions.checkArgument(originalSerializers.length 
== 2);
-   return new TtlSerializer<>(precomputed, 
(TypeSerializer) originalSerializers[1]);
+   return new TtlSerializer<>(precomputed, 
(TypeSerializer) originalSerializers[0], (TypeSerializer) 
originalSerializers[1]);
 
 Review comment:
   We could just `return new TtlSerializer<>(precomputed, originalSerializers);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Lost serialize fields when ttl state store with the mutable serializer
> --
>
> Key: FLINK-10567
> URL: https://issues.apache.org/jira/browse/FLINK-10567
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In TtlStateSerializer, when it is duplicate, it will lost the long field 
> serializer which will lead to exception when createInstance, which can easily 
> be reproduced by the test case



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6860: [FLINK-10567]Fix TtlStateSerializer lost field during duplicate()

2018-10-17 Thread GitBox
StefanRRichter commented on a change in pull request #6860: [FLINK-10567]Fix 
TtlStateSerializer lost field during duplicate()
URL: https://github.com/apache/flink/pull/6860#discussion_r225984090
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -224,7 +224,7 @@ protected Object getField(@Nonnull TtlValue v, int 
index) {
TypeSerializer ... originalSerializers) {
Preconditions.checkNotNull(originalSerializers);
Preconditions.checkArgument(originalSerializers.length 
== 2);
-   return new TtlSerializer<>(precomputed, 
(TypeSerializer) originalSerializers[1]);
+   return new TtlSerializer<>(precomputed, 
(TypeSerializer) originalSerializers[0], (TypeSerializer) 
originalSerializers[1]);
 
 Review comment:
   We could just `return new TtlSerializer<>(precomputed, originalSerializers);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653695#comment-16653695
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957516
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653689#comment-16653689
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225960810
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653697#comment-16653697
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957980
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[GitHub] pnowojski commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225970725
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -272,24 +264,30 @@ class TemporalRowtimeJoin(
 * Binary search `rightRowsSorted` to find the latest right row to join 
with `leftTime`.
 * Latest means a right row with largest time that is still smaller or 
equal to `leftTime`.
 *
-* @return index of such element. If such row was not found (either 
`rightRowsSorted` is empty
-* or all `rightRowsSorted` are are newer) return -1.
+* @return found element or `Option.empty` If such row was not found 
(either `rightRowsSorted`
+* is empty or all `rightRowsSorted` are are newer).
 */
-  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Int = {
+  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Option[Row] = {
 
 Review comment:
   `Option` > `NullPointerException` :( Do not use nulls without working 
`@Nullable` annotations


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653705#comment-16653705
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225970725
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -272,24 +264,30 @@ class TemporalRowtimeJoin(
 * Binary search `rightRowsSorted` to find the latest right row to join 
with `leftTime`.
 * Latest means a right row with largest time that is still smaller or 
equal to `leftTime`.
 *
-* @return index of such element. If such row was not found (either 
`rightRowsSorted` is empty
-* or all `rightRowsSorted` are are newer) return -1.
+* @return found element or `Option.empty` If such row was not found 
(either `rightRowsSorted`
+* is empty or all `rightRowsSorted` are are newer).
 */
-  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Int = {
+  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Option[Row] = {
 
 Review comment:
   `Option` > `NullPointerException` :( Do not use nulls without working 
`@Nullable` annotations


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653691#comment-16653691
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225956138
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
 
 Review comment:
   I would avoid nulls and wrap this variable with `Option` and pass `Option` 
to the `updateCleanupTimer` method. Handling nullable variables without 
compilation errors enables is very dangerous and error prone.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, plea

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653694#comment-16653694
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225954133
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
 
 Review comment:
   As we discussed, I would slightly lean toward dropping the `final` words 
here, dropping the proxy/wrapper methods `onXYZ` and relaying on user to do 
something like:
   ```
   override def open() = {
 super.open()
 // my fancy logic.
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653698#comment-16653698
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957668
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653687#comment-16653687
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225955291
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
 
 Review comment:
   here, I'm not sure. Either I would do the same as for `open` or leave this 
methods not implemented and relay on user to write sth like:
   ```
   override def processElement1(element) = {
 registerProcessingCleanupTimer()
 // my fancy logic
   }
   ```
   The argument in favour of second option is that maybe we want to 
register/bump the timers only on `processElement2` (whenever we touch the build 
side) and not on `processElement1`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
>

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653690#comment-16653690
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225965180
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653692#comment-16653692
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225962699
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653693#comment-16653693
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957024
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653688#comment-16653688
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225959340
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
 
 Review comment:
   Could you deduplicate the setup/closing logic of those methods? For example 
extract
   ```
   val operator: StubOperatorWithTTLTimers =
 new StubOperatorWithTTLTimers(streamQueryConfig)
testHarness = createTestHarness(operator)
testHarness.open()
   ```
   to a setup method or to `createStubHarnessWithTTLTimers()` method.
   
   If you pick the `setup` method, `close` could be moved to tearDown (but that 
would require `testHarness` to be a field not a local variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653696#comment-16653696
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225961569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225960810
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, has

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225961569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, has

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957024
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
+* cleanup timer is registered per key.
+*/
 
 Review comment:
   I would drop this comment. The method is pretty self explanatory on it's own 
and shorter to read than the comment itself.

-

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225962699
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, has

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957516
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
+* cleanup timer is registered per key.
+*/
+  private def updateCleanupTimer(currentProcessingTime: JLong, 
currentCleanupTime: JLong): Unit = {
+if (currentCleanupTime != null) {
+  timerService.deleteProcessingTimeTimer(currentCleanupTime)

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225955291
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
 
 Review comment:
   here, I'm not sure. Either I would do the same as for `open` or leave this 
methods not implemented and relay on user to write sth like:
   ```
   override def processElement1(element) = {
 registerProcessingCleanupTimer()
 // my fancy logic
   }
   ```
   The argument in favour of second option is that maybe we want to 
register/bump the timers only on `processElement2` (whenever we touch the build 
side) and not on `processElement1`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957668
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
+* cleanup timer is registered per key.
+*/
+  private def updateCleanupTimer(currentProcessingTime: JLong, 
currentCleanupTime: JLong): Unit = {
+if (currentCleanupTime != null) {
+  timerService.deleteProcessingTimeTimer(currentCleanupTime)

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225956138
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
 
 Review comment:
   I would avoid nulls and wrap this variable with `Option` and pass `Option` 
to the `updateCleanupTimer` method. Handling nullable variables without 
compilation errors enables is very dangerous and error prone.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225954133
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
 
 Review comment:
   As we discussed, I would slightly lean toward dropping the `final` words 
here, dropping the proxy/wrapper methods `onXYZ` and relaying on user to do 
something like:
   ```
   override def open() = {
 super.open()
 // my fancy logic.
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957980
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
+* cleanup timer is registered per key.
+*/
+  private def updateCleanupTimer(currentProcessingTime: JLong, 
currentCleanupTime: JLong): Unit = {
+if (currentCleanupTime != null) {
+  timerService.deleteProcessingTimeTimer(currentCleanupTime)

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225965180
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, has

[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225959340
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
 
 Review comment:
   Could you deduplicate the setup/closing logic of those methods? For example 
extract
   ```
   val operator: StubOperatorWithTTLTimers =
 new StubOperatorWithTTLTimers(streamQueryConfig)
testHarness = createTestHarness(operator)
testHarness.open()
   ```
   to a setup method or to `createStubHarnessWithTTLTimers()` method.
   
   If you pick the `setup` method, `close` could be moved to tearDown (but that 
would require `testHarness` to be a field not a local variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653673#comment-16653673
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796
 
 
   Alright, now the problem is a bit clearer to me. The underlying problem is 
that the `InputSplitAssigner's` semantics in case of a failover are not well 
defined. This is mainly due to the fact that Flink evolved over time.
   
   The general idea of the `InputSplitAssigner` is to lazily assign work to 
sources which have completely consumed their current `InputSplit`. The order in 
which this happens should not affect the correctness of the result.
   
   If you say that in case of a recovery the exact same `InputSplit` assignment 
needs to happen again, then I think it must be because our sources have some 
kind of state. Otherwise, it should not matter which source task completes the 
`InputSplit`, right? If this is correct, then we would run into the same 
problem if a JM failure happens, because we would lose all `InputSplit` 
assignment information which is stored on the JM. So stateful sources with 
`InputSplits` don't work at the moment (in the general case).
   
   If we assume that our sources are stateless, then simply returning the input 
splits to the assigner and letting the next idling task take it should work. In 
your example of the infinite stream which is initialized via the `InputSplits` 
there would be no other task competing for the `InputSplit` of a failed task 
because by definition they never finish their work, right? If multiple tasks 
fail, then the mapping might be different after the recovery, but every task 
would continue consuming from a single `InputSplit`. I think the problem here 
is that you abused the `InputSplitAssigner` for something it is not yet 
intended to do.
   
   The reason why I'm a bit hesitant here is because I think we do not fully 
understand yet what we actually want to have. Moreover, some corner cases not 
clear to me yet. For example, why would it be ok for a global failover to 
change the mapping and not for region failover? Another example is how to 
handle the case where we lose a TM and need to downscale. Would that 
effectively be a global failover where we redistribute all `InputSplits` (I 
would think so). 
   
   Before starting any concrete implementation steps, I think we should 
properly design this feature to get it right. A very related topic is actually 
the new source interface. Depending on how much we are able to unify batch and 
streaming, the whole `InputSplit` assignment might move into a single task 
(similar to the `ContinuousFileMonitoringSink`) and the assignment might become 
part of a checkpoint. That way, we would no longer need to take care of this on 
the JM side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-10-17 Thread GitBox
tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796
 
 
   Alright, now the problem is a bit clearer to me. The underlying problem is 
that the `InputSplitAssigner's` semantics in case of a failover are not well 
defined. This is mainly due to the fact that Flink evolved over time.
   
   The general idea of the `InputSplitAssigner` is to lazily assign work to 
sources which have completely consumed their current `InputSplit`. The order in 
which this happens should not affect the correctness of the result.
   
   If you say that in case of a recovery the exact same `InputSplit` assignment 
needs to happen again, then I think it must be because our sources have some 
kind of state. Otherwise, it should not matter which source task completes the 
`InputSplit`, right? If this is correct, then we would run into the same 
problem if a JM failure happens, because we would lose all `InputSplit` 
assignment information which is stored on the JM. So stateful sources with 
`InputSplits` don't work at the moment (in the general case).
   
   If we assume that our sources are stateless, then simply returning the input 
splits to the assigner and letting the next idling task take it should work. In 
your example of the infinite stream which is initialized via the `InputSplits` 
there would be no other task competing for the `InputSplit` of a failed task 
because by definition they never finish their work, right? If multiple tasks 
fail, then the mapping might be different after the recovery, but every task 
would continue consuming from a single `InputSplit`. I think the problem here 
is that you abused the `InputSplitAssigner` for something it is not yet 
intended to do.
   
   The reason why I'm a bit hesitant here is because I think we do not fully 
understand yet what we actually want to have. Moreover, some corner cases not 
clear to me yet. For example, why would it be ok for a global failover to 
change the mapping and not for region failover? Another example is how to 
handle the case where we lose a TM and need to downscale. Would that 
effectively be a global failover where we redistribute all `InputSplits` (I 
would think so). 
   
   Before starting any concrete implementation steps, I think we should 
properly design this feature to get it right. A very related topic is actually 
the new source interface. Depending on how much we are able to unify batch and 
streaming, the whole `InputSplit` assignment might move into a single task 
(similar to the `ContinuousFileMonitoringSink`) and the assignment might become 
part of a checkpoint. That way, we would no longer need to take care of this on 
the JM side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10582) Make REST executor thread priority configurable

2018-10-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10582.

Resolution: Fixed

master: 3f231ac46537654aebdd73b0f42aa823386bf901

1.6: 967b31b333e6f4b014ea3041f420bfaff2484618

1.5: 98f38288a12065dd665f7e0b2420d57f6408121a

> Make REST executor thread priority configurable
> ---
>
> Key: FLINK-10582
> URL: https://issues.apache.org/jira/browse/FLINK-10582
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.5.5, 1.6.2
>
>
> With FLINK-10282, we introduced a dedicated thread pool for the REST server 
> endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, 
> however, might affect existing users by making some of the REST calls no 
> longer responsive (e.g. if the other components' threads take all the time). 
> Therefore, I propose to set the default thread priority to 
> {{Thread.NORM_PRIORITY}} and make it additionally configurable such that 
> users can change it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10582) Make REST executor thread priority configurable

2018-10-17 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10582:
-
Fix Version/s: 1.6.2
   1.5.5

> Make REST executor thread priority configurable
> ---
>
> Key: FLINK-10582
> URL: https://issues.apache.org/jira/browse/FLINK-10582
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.5, 1.6.2, 1.7.0
>
>
> With FLINK-10282, we introduced a dedicated thread pool for the REST server 
> endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, 
> however, might affect existing users by making some of the REST calls no 
> longer responsive (e.g. if the other components' threads take all the time). 
> Therefore, I propose to set the default thread priority to 
> {{Thread.NORM_PRIORITY}} and make it additionally configurable such that 
> users can change it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10582) Make REST executor thread priority configurable

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653652#comment-16653652
 ] 

ASF GitHub Bot commented on FLINK-10582:


zentol closed pull request #6868: [FLINK-10582] Make REST executor's thread 
priority configurable
URL: https://github.com/apache/flink/pull/6868
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1aa963fb3e2..2c5f539a480 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
 4
 The number of threads for the asynchronous processing of 
requests.
 
+
+rest.server.thread-priority
+5
+Thread priority of the REST server's executor for processing 
asynchronous requests. Lowering the thread priority will give Flink's main 
components more CPU time whereas increasing will allocate more time for the 
REST server's processing.
+
 
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39be808..11c38de06c6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -121,4 +122,12 @@
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the 
asynchronous processing of requests.");
+
+   public static final ConfigOption SERVER_THREAD_PRIORITY = 
key("rest.server.thread-priority")
+   .defaultValue(Thread.NORM_PRIORITY)
+   .withDescription(Description.builder()
+   .text("Thread priority of the REST server's executor 
for processing asynchronous requests. " +
+   "Lowering the thread priority will give Flink's 
main components more CPU time whereas " +
+   "increasing will allocate more time for the 
REST server's processing.")
+   .build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index c09c41bb2ec..6d557d0815f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -141,6 +141,7 @@ public AbstractDispatcherResourceManagerComponentFactory(
blobServer,
WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem, 
timeout),

highAvailabilityServices.getWebMonitorLeaderElectionService(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3fc7007f9c5..69bad47e158 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -356,6 +356,7 @@ public void start() throws Exception {
blobServer.getTransientBlobService(),

WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
   

[GitHub] zentol closed pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
zentol closed pull request #6868: [FLINK-10582] Make REST executor's thread 
priority configurable
URL: https://github.com/apache/flink/pull/6868
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1aa963fb3e2..2c5f539a480 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
 4
 The number of threads for the asynchronous processing of 
requests.
 
+
+rest.server.thread-priority
+5
+Thread priority of the REST server's executor for processing 
asynchronous requests. Lowering the thread priority will give Flink's main 
components more CPU time whereas increasing will allocate more time for the 
REST server's processing.
+
 
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39be808..11c38de06c6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -121,4 +122,12 @@
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the 
asynchronous processing of requests.");
+
+   public static final ConfigOption SERVER_THREAD_PRIORITY = 
key("rest.server.thread-priority")
+   .defaultValue(Thread.NORM_PRIORITY)
+   .withDescription(Description.builder()
+   .text("Thread priority of the REST server's executor 
for processing asynchronous requests. " +
+   "Lowering the thread priority will give Flink's 
main components more CPU time whereas " +
+   "increasing will allocate more time for the 
REST server's processing.")
+   .build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index c09c41bb2ec..6d557d0815f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -141,6 +141,7 @@ public AbstractDispatcherResourceManagerComponentFactory(
blobServer,
WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem, 
timeout),

highAvailabilityServices.getWebMonitorLeaderElectionService(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3fc7007f9c5..69bad47e158 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -356,6 +356,7 @@ public void start() throws Exception {
blobServer.getTransientBlobService(),

WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org

[jira] [Commented] (FLINK-10582) Make REST executor thread priority configurable

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653650#comment-16653650
 ] 

ASF GitHub Bot commented on FLINK-10582:


zentol commented on a change in pull request #6868: [FLINK-10582] Make REST 
executor's thread priority configurable
URL: https://github.com/apache/flink/pull/6868#discussion_r225961269
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
 ##
 @@ -121,4 +122,12 @@
key("rest.server.numThreads")
 
 Review comment:
   i guess I didn't follow naming conventions here? we could still hotfix that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make REST executor thread priority configurable
> ---
>
> Key: FLINK-10582
> URL: https://issues.apache.org/jira/browse/FLINK-10582
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10282, we introduced a dedicated thread pool for the REST server 
> endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, 
> however, might affect existing users by making some of the REST calls no 
> longer responsive (e.g. if the other components' threads take all the time). 
> Therefore, I propose to set the default thread priority to 
> {{Thread.NORM_PRIORITY}} and make it additionally configurable such that 
> users can change it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
zentol commented on a change in pull request #6868: [FLINK-10582] Make REST 
executor's thread priority configurable
URL: https://github.com/apache/flink/pull/6868#discussion_r225961269
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
 ##
 @@ -121,4 +122,12 @@
key("rest.server.numThreads")
 
 Review comment:
   i guess I didn't follow naming conventions here? we could still hotfix that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
zentol closed pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's 
thread priority configurable
URL: https://github.com/apache/flink/pull/6869
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1aa963fb3e2..2c5f539a480 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
 4
 The number of threads for the asynchronous processing of 
requests.
 
+
+rest.server.thread-priority
+5
+Thread priority of the REST server's executor for processing 
asynchronous requests. Lowering the thread priority will give Flink's main 
components more CPU time whereas increasing will allocate more time for the 
REST server's processing.
+
 
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39be808..11c38de06c6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -121,4 +122,12 @@
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the 
asynchronous processing of requests.");
+
+   public static final ConfigOption SERVER_THREAD_PRIORITY = 
key("rest.server.thread-priority")
+   .defaultValue(Thread.NORM_PRIORITY)
+   .withDescription(Description.builder()
+   .text("Thread priority of the REST server's executor 
for processing asynchronous requests. " +
+   "Lowering the thread priority will give Flink's 
main components more CPU time whereas " +
+   "increasing will allocate more time for the 
REST server's processing.")
+   .build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index fd0a0a1d39f..4f575ce4f3e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -331,6 +331,7 @@ protected void startClusterComponents(
transientBlobCache,
WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem, 
timeout),

highAvailabilityServices.getWebMonitorLeaderElectionService());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4bfdb25c4d6..1d2c8c75eaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -348,6 +348,7 @@ public void start() throws Exception {
blobServer.getTransientBlobService(),

WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
actorSystem,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc54fb..c480c33c671 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.jav

[GitHub] zentol closed pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
zentol closed pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's 
thread priority configurable
URL: https://github.com/apache/flink/pull/6870
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1aa963fb3e2..2c5f539a480 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
 4
 The number of threads for the asynchronous processing of 
requests.
 
+
+rest.server.thread-priority
+5
+Thread priority of the REST server's executor for processing 
asynchronous requests. Lowering the thread priority will give Flink's main 
components more CPU time whereas increasing will allocate more time for the 
REST server's processing.
+
 
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39be808..11c38de06c6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -121,4 +122,12 @@
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the 
asynchronous processing of requests.");
+
+   public static final ConfigOption SERVER_THREAD_PRIORITY = 
key("rest.server.thread-priority")
+   .defaultValue(Thread.NORM_PRIORITY)
+   .withDescription(Description.builder()
+   .text("Thread priority of the REST server's executor 
for processing asynchronous requests. " +
+   "Lowering the thread priority will give Flink's 
main components more CPU time whereas " +
+   "increasing will allocate more time for the 
REST server's processing.")
+   .build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 2c34b530804..c4c583853bf 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -330,6 +330,7 @@ protected void startClusterComponents(
transientBlobCache,
WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem, 
timeout),

highAvailabilityServices.getWebMonitorLeaderElectionService());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a7840d64830..74ff770e69a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -350,6 +350,7 @@ public void start() throws Exception {
blobServer.getTransientBlobService(),

WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+   
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
actorSystem,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc54fb..c480c33c671 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.jav

[jira] [Commented] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method

2018-10-17 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653630#comment-16653630
 ] 

Till Rohrmann commented on FLINK-10575:
---

I think this method should only be removed after we have removed 
{{JobManager}}, because it depends on it.

> Remove deprecated ExecutionGraphBuilder.buildGraph method
> -
>
> Key: FLINK-10575
> URL: https://issues.apache.org/jira/browse/FLINK-10575
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>
> ExecutionGraphBuilder is not a public API and we should able to remove 
> deprecated method such as:
> @Deprecated
> public static ExecutionGraph buildGraph
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-17 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653624#comment-16653624
 ] 

Till Rohrmann commented on FLINK-10436:
---

Yes, I think this would solve the problem. We would then only print the 
deprecation warning if {{isDeprecated}} is {{true}}.

> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.

2018-10-17 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-10585:
-
Description: 
Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a loop 
may trigger the HTTP server to send a response with {{Content-Type: 
application/octet-stream}}, which causes the test to fail. The expected 
{{Content-Type}} is {{application/json}}. Note that the REST handler used for 
testing, can only return json responses. The failure can likely be triggered 
for other tests inside {{RestServerEndpointITCase}} as well. The behavior has 
not been observed on Linux so far.

To run the test in a loop, apply the git patch in the attachment, and execute 
the following command:
{code}
mvn clean integration-test -pl flink-runtime -am 
-Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
-Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
-Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
{code}

After a while you may see the following stacktrace in the test's log file:
{noformat}

15:25:45,619 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase   
 -

Test 
testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase)
 is running.

15:25:45,620 WARN  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Upload directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
 does not exist, or has been deleted externally. Previously uploaded files are 
no longer available.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Created directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
 for file uploads.
15:25:45,620 INFO  org.apache.flink.runtime.rest.RestClient 
 - Rest client endpoint started.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Starting rest endpoint.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Rest endpoint listening at localhost:52841
15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient 
 - Response was not valid JSON.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) 
is allowed between tokens
 at [Source: 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; 
line: 1, column: 2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2891)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:707)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:505)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:453)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.apache.flink.shaded.

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653601#comment-16653601
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u opened a new pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871
 
 
   ## What is the purpose of the change
   
   This is the first step for the implementation of FLINK-10583 and FLINK-10584.
   
   It introduces the `AbstractTwoInputStreamOperatorWithTTL` which contains the 
basic functionality for implementing state TTL based on timers.
   
   This operator makes sure that:
   1) only at most one timer is registered per key
   2) both "sides" of the operator (processElement1 and processElement2) are 
treated equally when it comes to registering cleanup timers.
   
   ## Verifying this change
   
   Added the `AbstractTwoInputStreamOperatorWithTTLTest` with tests for the 
class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   
   R @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10583:
---
Labels: pull-request-available  (was: )

> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u opened a new pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.

2018-10-17 Thread GitBox
kl0u opened a new pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871
 
 
   ## What is the purpose of the change
   
   This is the first step for the implementation of FLINK-10583 and FLINK-10584.
   
   It introduces the `AbstractTwoInputStreamOperatorWithTTL` which contains the 
basic functionality for implementing state TTL based on timers.
   
   This operator makes sure that:
   1) only at most one timer is registered per key
   2) both "sides" of the operator (processElement1 and processElement2) are 
treated equally when it comes to registering cleanup timers.
   
   ## Verifying this change
   
   Added the `AbstractTwoInputStreamOperatorWithTTLTest` with tests for the 
class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   
   R @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.

2018-10-17 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-10585:
-
Description: 
Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a loop 
may trigger the HTTP server to send a response with {{Content-Type: 
application/octet-stream}}, which causes the test to fail. The expected 
{{Content-Type}} is {{application/json}}. Note that the REST handler used for 
testing, can only return json responses. The failure can likely be triggered 
for other tests inside {{RestServerEndpointITCase}} as well. The behavior so 
far could not be observed on Linux.

To run the test in a loop, apply the git patch in the attachment, and execute 
the following command:
{code}
mvn clean integration-test -pl flink-runtime -am 
-Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
-Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
-Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
{code}

After a while you may see the following stacktrace in the test's log file:
{noformat}

15:25:45,619 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase   
 -

Test 
testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase)
 is running.

15:25:45,620 WARN  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Upload directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
 does not exist, or has been deleted externally. Previously uploaded files are 
no longer available.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Created directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
 for file uploads.
15:25:45,620 INFO  org.apache.flink.runtime.rest.RestClient 
 - Rest client endpoint started.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Starting rest endpoint.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Rest endpoint listening at localhost:52841
15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient 
 - Response was not valid JSON.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) 
is allowed between tokens
 at [Source: 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; 
line: 1, column: 2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2891)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:707)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:505)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:453)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.apache.flink.shaded.

[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.

2018-10-17 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-10585:
-
Environment: 
*Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
*OS:* macOS High Sierra 10.13.6 (17G65)
*Maven:* 3.2.5
*Java:*
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)

  was:
*Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518

*OS:* macOS High Sierra 10.13.6 (17G65)
*Maven:* 3.2.5
*Java:*
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)


> RestServerEndpoint responds with wrong Content-Type in Integration Test.
> 
>
> Key: FLINK-10585
> URL: https://issues.apache.org/jira/browse/FLINK-10585
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Tests
>Affects Versions: 1.6.1, 1.7.0
> Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
> *OS:* macOS High Sierra 10.13.6 (17G65)
> *Maven:* 3.2.5
> *Java:*
> openjdk version "1.8.0_181"
> OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
> OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: loop_test.patch
>
>
> Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a 
> loop may trigger the HTTP server to send a response with {{Content-Type: 
> application/octet-stream}}, which causes the test to fail. The expected 
> {{Content-Type}} is {{application/json}}. Note that the REST handler used for 
> testing, can only return json responses. The failure can likely be triggered 
> for other tests inside {{RestServerEndpointITCase}} as well.
> To run the test in a loop, apply the git patch in the attachment, and execute 
> the following command:
> {code}
> mvn clean integration-test -pl flink-runtime -am 
> -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
> -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
> -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
> {code}
> After a while you may see the following stacktrace in the test's log file:
> {noformat}
> 
> 15:25:45,619 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase 
>-
> 
> Test 
> testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase)
>  is running.
> 
> 15:25:45,620 WARN  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Upload directory 
> /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
>  does not exist, or has been deleted externally. Previously uploaded files 
> are no longer available.
> 15:25:45,620 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Created directory 
> /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
>  for file uploads.
> 15:25:45,620 INFO  org.apache.flink.runtime.rest.RestClient   
>- Rest client endpoint started.
> 15:25:45,620 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Starting rest endpoint.
> 15:25:45,620 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Rest endpoint listening at localhost:52841
> 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient   
>- Response was not valid JSON.
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>  Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, 
> \t) is allowed between tokens
>  at [Source: 
> org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; 
> line: 1, column: 2]
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(U

[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.

2018-10-17 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-10585:
-
Environment: 
*Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518

*OS:* macOS High Sierra 10.13.6 (17G65)
*Maven:* 3.2.5
*Java:*
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)

  was:
*Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
*OS:* macOS High Sierra 10.13.6 (17G65)
*Maven:* 3.2.5
*Java:*
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)


> RestServerEndpoint responds with wrong Content-Type in Integration Test.
> 
>
> Key: FLINK-10585
> URL: https://issues.apache.org/jira/browse/FLINK-10585
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Tests
>Affects Versions: 1.6.1, 1.7.0
> Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
> *OS:* macOS High Sierra 10.13.6 (17G65)
> *Maven:* 3.2.5
> *Java:*
> openjdk version "1.8.0_181"
> OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
> OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: loop_test.patch
>
>
> Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a 
> loop may trigger the HTTP server to send a response with {{Content-Type: 
> application/octet-stream}}, which causes the test to fail. The expected 
> {{Content-Type}} is {{application/json}}. Note that the REST handler used for 
> testing, can only return json responses. The failure can likely be triggered 
> for other tests inside {{RestServerEndpointITCase}} as well.
> To run the test in a loop, apply the git patch in the attachment, and execute 
> the following command:
> {code}
> mvn clean integration-test -pl flink-runtime -am 
> -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
> -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
> -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
> {code}
> After a while you may see the following stacktrace in the test's log file:
> {noformat}
> 
> 15:25:45,619 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase 
>-
> 
> Test 
> testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase)
>  is running.
> 
> 15:25:45,620 WARN  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Upload directory 
> /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
>  does not exist, or has been deleted externally. Previously uploaded files 
> are no longer available.
> 15:25:45,620 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Created directory 
> /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
>  for file uploads.
> 15:25:45,620 INFO  org.apache.flink.runtime.rest.RestClient   
>- Rest client endpoint started.
> 15:25:45,620 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Starting rest endpoint.
> 15:25:45,620 INFO  
> org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint 
>  - Rest endpoint listening at localhost:52841
> 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient   
>- Response was not valid JSON.
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>  Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, 
> \t) is allowed between tokens
>  at [Source: 
> org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; 
> line: 1, column: 2]
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(U

[jira] [Created] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.

2018-10-17 Thread Gary Yao (JIRA)
Gary Yao created FLINK-10585:


 Summary: RestServerEndpoint responds with wrong Content-Type in 
Integration Test.
 Key: FLINK-10585
 URL: https://issues.apache.org/jira/browse/FLINK-10585
 Project: Flink
  Issue Type: Bug
  Components: REST, Tests
Affects Versions: 1.6.1, 1.7.0
 Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518
*OS:* macOS High Sierra 10.13.6 (17G65)
*Maven:* 3.2.5
*Java:*
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02)
OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode)
Reporter: Gary Yao
 Fix For: 1.8.0
 Attachments: loop_test.patch

Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a loop 
may trigger the HTTP server to send a response with {{Content-Type: 
application/octet-stream}}, which causes the test to fail. The expected 
{{Content-Type}} is {{application/json}}. Note that the REST handler used for 
testing, can only return json responses. The failure can likely be triggered 
for other tests inside {{RestServerEndpointITCase}} as well.

To run the test in a loop, apply the git patch in the attachment, and execute 
the following command:
{code}
mvn clean integration-test -pl flink-runtime -am 
-Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false 
-Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir 
-Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties
{code}

After a while you may see the following stacktrace in the test's log file:
{noformat}

15:25:45,619 INFO  org.apache.flink.runtime.rest.RestServerEndpointITCase   
 -

Test 
testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase)
 is running.

15:25:45,620 WARN  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Upload directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
 does not exist, or has been deleted externally. Previously uploaded files are 
no longer available.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Created directory 
/private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload
 for file uploads.
15:25:45,620 INFO  org.apache.flink.runtime.rest.RestClient 
 - Rest client endpoint started.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Starting rest endpoint.
15:25:45,620 INFO  
org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint  
- Rest endpoint listening at localhost:52841
15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient 
 - Response was not valid JSON.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) 
is allowed between tokens
 at [Source: 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; 
line: 1, column: 2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2891)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:707)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:505)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:453)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.apache.flink.shaded.netty4.io.netty.channel.Abs

[jira] [Commented] (FLINK-10578) Support writable state corresponding to queryable state

2018-10-17 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653587#comment-16653587
 ] 

Kostas Kloudas commented on FLINK-10578:


Thanks a lot [~wind_ljy]!

> Support writable state corresponding to queryable state
> ---
>
> Key: FLINK-10578
> URL: https://issues.apache.org/jira/browse/FLINK-10578
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State, State Backends, Checkpointing
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> I can see that now we support queryable state, why don't we also support the 
> writable state? I've already done this on our own business to initialize the 
> state in window during the program's first run. I think other companies may 
> have other needs to modify the state from outside the job.
> The synchronization is a problem but from my perspective, very few objects 
> will be affected because we can directly operate on the stateTable(for the 
> FsStateBackend), and about the performance, I think it'll be okay if users 
> know about it.
> I'm not very aware of what will be affected except what I said above, please 
> let me know more if anyone has doubt about this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10564) tm costs too much time when communicating with jm

2018-10-17 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653584#comment-16653584
 ] 

Till Rohrmann commented on FLINK-10564:
---

Thanks for reporting this issue [~chenlf]. It looks as if the JM sometimes does 
not answer before the {{akka.ask.timeout}} or {{akka.client.timeout}} has been 
reached. This could be cause by a busy {{JobManager}}. I would try to increase 
these timeouts and see whether the problem still persists.

In general, the Flink community no longer actively supports the version {{< 
1.5.x}}. Therefore, I would recommend considering to upgrade to a new Flink 
version, eventually.

> tm costs too much time when communicating with  jm
> --
>
> Key: FLINK-10564
> URL: https://issues.apache.org/jira/browse/FLINK-10564
> Project: Flink
>  Issue Type: Bug
>  Components: Core, JobManager, TaskManager
> Environment: configs are following:
> jm
> high-availability zookeeper
> taskmanager.heap.mb   16384
> taskmanager.memory.preallocatefalse
> taskmanager.numberOfTaskSlots 64
> tm
> slots 128
> free slots 0-128
> cpu core 40 
> Physical Memory 95gb
> free Memory 32gb-50gb
> Flink Managed Memory 22gb-35gb
>Reporter: chenlf
>Priority: Major
> Attachments: timeout.log
>
>
> it works fine until the number of tasks is above about 400.
>  There are 600+ tasks(each task handles billion data) running in our cluster 
> now,and the problem is it costs too much time (even time out)when 
> submiting/canceling/querying a task.
>  Recouses like memory,cpu are on normal level.
> after debuging,we found this method is the culprit:
>  
> org.apache.flink.runtime.util.LeaderRetrievalUtils.LeaderGatewayListener.notifyLeaderAddress(String,
>  UUID)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10578) Support writable state corresponding to queryable state

2018-10-17 Thread Jiayi Liao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao closed FLINK-10578.
--
Resolution: Not A Problem

> Support writable state corresponding to queryable state
> ---
>
> Key: FLINK-10578
> URL: https://issues.apache.org/jira/browse/FLINK-10578
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State, State Backends, Checkpointing
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> I can see that now we support queryable state, why don't we also support the 
> writable state? I've already done this on our own business to initialize the 
> state in window during the program's first run. I think other companies may 
> have other needs to modify the state from outside the job.
> The synchronization is a problem but from my perspective, very few objects 
> will be affected because we can directly operate on the stateTable(for the 
> FsStateBackend), and about the performance, I think it'll be okay if users 
> know about it.
> I'm not very aware of what will be affected except what I said above, please 
> let me know more if anyone has doubt about this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10578) Support writable state corresponding to queryable state

2018-10-17 Thread Jiayi Liao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653581#comment-16653581
 ] 

Jiayi Liao commented on FLINK-10578:


[~kkl0u] Thanks for pointing out the risks and problems. 

I've read the proposal before and it's exactly what I need. So at first, I 
tried to implement the operations on savepoints of FsStateBackend in bravo, but 
the effort is too big because of the FsStateBackend's internal code structure, 
and as the mails said, bravo should live as part of the Flink project to make 
deeper intergration possible in the long run. That's why I give up using bravo 
and try to solve it in another way.

What you said about breaking the fault-tolerace guarantees is really a problem 
that I didn't think about too much before, I'll close this and maybe reopen it 
if anyone really needs this in the future.

> Support writable state corresponding to queryable state
> ---
>
> Key: FLINK-10578
> URL: https://issues.apache.org/jira/browse/FLINK-10578
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State, State Backends, Checkpointing
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> I can see that now we support queryable state, why don't we also support the 
> writable state? I've already done this on our own business to initialize the 
> state in window during the program's first run. I think other companies may 
> have other needs to modify the state from outside the job.
> The synchronization is a problem but from my perspective, very few objects 
> will be affected because we can directly operate on the stateTable(for the 
> FsStateBackend), and about the performance, I think it'll be okay if users 
> know about it.
> I'm not very aware of what will be affected except what I said above, please 
> let me know more if anyone has doubt about this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10584) Add support for state retention to the Event Time versioned joins.

2018-10-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10584:
--

 Summary: Add support for state retention to the Event Time 
versioned joins.
 Key: FLINK-10584
 URL: https://issues.apache.org/jira/browse/FLINK-10584
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10583:
--

 Summary: Add support for state retention to the Processing Time 
versioned joins.
 Key: FLINK-10583
 URL: https://issues.apache.org/jira/browse/FLINK-10583
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10578) Support writable state queryable state

2018-10-17 Thread Jiayi Liao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-10578:
---
Summary: Support writable state  queryable state  (was: Support writable 
state in QueryableStateClient)

> Support writable state  queryable state
> ---
>
> Key: FLINK-10578
> URL: https://issues.apache.org/jira/browse/FLINK-10578
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State, State Backends, Checkpointing
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> I can see that now we support queryable state, why don't we also support the 
> writable state? I've already done this on our own business to initialize the 
> state in window during the program's first run. I think other companies may 
> have other needs to modify the state from outside the job.
> The synchronization is a problem but from my perspective, very few objects 
> will be affected because we can directly operate on the stateTable(for the 
> FsStateBackend), and about the performance, I think it'll be okay if users 
> know about it.
> I'm not very aware of what will be affected except what I said above, please 
> let me know more if anyone has doubt about this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10578) Support writable state corresponding to queryable state

2018-10-17 Thread Jiayi Liao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-10578:
---
Summary: Support writable state corresponding to queryable state  (was: 
Support writable state  queryable state)

> Support writable state corresponding to queryable state
> ---
>
> Key: FLINK-10578
> URL: https://issues.apache.org/jira/browse/FLINK-10578
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State, State Backends, Checkpointing
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> I can see that now we support queryable state, why don't we also support the 
> writable state? I've already done this on our own business to initialize the 
> state in window during the program's first run. I think other companies may 
> have other needs to modify the state from outside the job.
> The synchronization is a problem but from my perspective, very few objects 
> will be affected because we can directly operate on the stateTable(for the 
> FsStateBackend), and about the performance, I think it'll be okay if users 
> know about it.
> I'm not very aware of what will be affected except what I said above, please 
> let me know more if anyone has doubt about this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #6857: [hotfix] [table] Refactor SqlToConverter configuration

2018-10-17 Thread GitBox
fhueske commented on issue #6857: [hotfix] [table] Refactor SqlToConverter 
configuration
URL: https://github.com/apache/flink/pull/6857#issuecomment-430625614
 
 
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10253) Run MetricQueryService with lower priority

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653532#comment-16653532
 ] 

ASF GitHub Bot commented on FLINK-10253:


asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with 
lower priority
URL: https://github.com/apache/flink/pull/6839
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 5c073f3ad59..420bb7f1da2 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -12,6 +12,11 @@
 "0"
 The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.
 
+
+metrics.internal.query-service.thread-priority
+1
+The thread priority used for Flink's internal metric query 
service. The thread is created by Akka's thread pool executor. The range of the 
priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing 
this value may bring the main Flink components down.
+
 
 metrics.latency.granularity
 "operator"
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 0e7268ee052..0785b347335 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -156,6 +156,18 @@
"ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
"Flink will pick a random port.");
 
+   /**
+* The thread priority for Flink's internal metric query service. The 
{@code 1} means the min priority and the
+* {@code 10} means the max priority.
+*/
+   public static final ConfigOption QUERY_SERVICE_THREAD_PRIORITY 
=
+   key("metrics.internal.query-service.thread-priority")
+   .defaultValue(1)
+   .withDescription("The thread priority used for Flink's internal 
metric query service. The thread is created" +
+   " by Akka's thread pool executor. " +
+   "The range of the priority is from 1 (MIN_PRIORITY) to 
10 (MAX_PRIORITY). " +
+   "Warning, increasing this value may bring the main 
Flink components down.");
+
private MetricOptions() {
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b61737d20..430af98bc2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,8 @@ private static Config 
getExecutorConfigByExecutorMode(Configuration configuratio
case FORK_JOIN_EXECUTOR:
return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
case FIXED_THREAD_POOL_EXECUTOR:
-   return AkkaUtils.getThreadPoolExecutorConfig();
+   return AkkaUtils.getThreadPoolExecutorConfig(
+   
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
default:
throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
}
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
new file mode 100644
index 000..d6f6d76ec51
--- /dev/null
+++ b/flink-r

[GitHub] asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with lower priority

2018-10-17 Thread GitBox
asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with 
lower priority
URL: https://github.com/apache/flink/pull/6839
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 5c073f3ad59..420bb7f1da2 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -12,6 +12,11 @@
 "0"
 The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.
 
+
+metrics.internal.query-service.thread-priority
+1
+The thread priority used for Flink's internal metric query 
service. The thread is created by Akka's thread pool executor. The range of the 
priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing 
this value may bring the main Flink components down.
+
 
 metrics.latency.granularity
 "operator"
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 0e7268ee052..0785b347335 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -156,6 +156,18 @@
"ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
"Flink will pick a random port.");
 
+   /**
+* The thread priority for Flink's internal metric query service. The 
{@code 1} means the min priority and the
+* {@code 10} means the max priority.
+*/
+   public static final ConfigOption QUERY_SERVICE_THREAD_PRIORITY 
=
+   key("metrics.internal.query-service.thread-priority")
+   .defaultValue(1)
+   .withDescription("The thread priority used for Flink's internal 
metric query service. The thread is created" +
+   " by Akka's thread pool executor. " +
+   "The range of the priority is from 1 (MIN_PRIORITY) to 
10 (MAX_PRIORITY). " +
+   "Warning, increasing this value may bring the main 
Flink components down.");
+
private MetricOptions() {
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b61737d20..430af98bc2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,8 @@ private static Config 
getExecutorConfigByExecutorMode(Configuration configuratio
case FORK_JOIN_EXECUTOR:
return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
case FIXED_THREAD_POOL_EXECUTOR:
-   return AkkaUtils.getThreadPoolExecutorConfig();
+   return AkkaUtils.getThreadPoolExecutorConfig(
+   
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
default:
throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
}
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
new file mode 100644
index 000..d6f6d76ec51
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional i

[jira] [Closed] (FLINK-10253) Run MetricQueryService with lower priority

2018-10-17 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-10253.
-
   Resolution: Fixed
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)

Fixed in 1.7.0 via:
f81297ac224f45d750ee0616755ade809b57ea4a
39324331979cdedb9832bf06408bb182fc9476fa

> Run MetricQueryService with lower priority
> --
>
> Key: FLINK-10253
> URL: https://issues.apache.org/jira/browse/FLINK-10253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We should run the {{MetricQueryService}} with a lower priority than the main 
> Flink components. An idea would be to start the underlying threads with a 
> lower priority.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
tillrohrmann opened a new pull request #6870: [BP-1.5][FLINK-10582] Make REST 
executor's thread priority configurable
URL: https://github.com/apache/flink/pull/6870
 
 
   Backport of #6868 to `release-1.5`.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann opened a new pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
tillrohrmann opened a new pull request #6869: [BP-1.6][FLINK-10582] Make REST 
executor's thread priority configurable
URL: https://github.com/apache/flink/pull/6869
 
 
   Backport of #6868 to `release-1.6`.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10582) Make REST executor thread priority configurable

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653515#comment-16653515
 ] 

ASF GitHub Bot commented on FLINK-10582:


tillrohrmann opened a new pull request #6868: [FLINK-10582] Make REST 
executor's thread priority configurable
URL: https://github.com/apache/flink/pull/6868
 
 
   ## What is the purpose of the change
   
   Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority") 
to configure the
   thread priority of the REST executor's threads.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make REST executor thread priority configurable
> ---
>
> Key: FLINK-10582
> URL: https://issues.apache.org/jira/browse/FLINK-10582
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10282, we introduced a dedicated thread pool for the REST server 
> endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, 
> however, might affect existing users by making some of the REST calls no 
> longer responsive (e.g. if the other components' threads take all the time). 
> Therefore, I propose to set the default thread priority to 
> {{Thread.NORM_PRIORITY}} and make it additionally configurable such that 
> users can change it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10582) Make REST executor thread priority configurable

2018-10-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10582:
---
Labels: pull-request-available  (was: )

> Make REST executor thread priority configurable
> ---
>
> Key: FLINK-10582
> URL: https://issues.apache.org/jira/browse/FLINK-10582
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-10282, we introduced a dedicated thread pool for the REST server 
> endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, 
> however, might affect existing users by making some of the REST calls no 
> longer responsive (e.g. if the other components' threads take all the time). 
> Therefore, I propose to set the default thread priority to 
> {{Thread.NORM_PRIORITY}} and make it additionally configurable such that 
> users can change it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable

2018-10-17 Thread GitBox
tillrohrmann opened a new pull request #6868: [FLINK-10582] Make REST 
executor's thread priority configurable
URL: https://github.com/apache/flink/pull/6868
 
 
   ## What is the purpose of the change
   
   Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority") 
to configure the
   thread priority of the REST executor's threads.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10537) Network throughput performance regression after broadcast changes

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653494#comment-16653494
 ] 

ASF GitHub Bot commented on FLINK-10537:


pnowojski closed pull request #6833: [FLINK-10537][network] Fix network small 
performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 7738a3091e1..01feae03380 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -108,6 +108,7 @@ public int length() {
}
 
public void pruneBuffer() {
+   clear();
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing serialization buffer of " 
+ this.buffer.length + " bytes.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 6eebbbe88eb..c0cf35d9576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -82,9 +82,9 @@ public boolean isFullBuffer() {
SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
 
/**
-* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
-* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
-* {@link #copyToBufferBuilder(BufferBuilder)}.
+* Clears the buffer and checks to decrease the size of intermediate 
data serialization buffer
+* after finishing the whole serialization process including
+* {@link #serializeRecord(IOReadableWritable)} and {@link 
#copyToBufferBuilder(BufferBuilder)}.
 */
void prune();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index ba2ed0133fd..f0666791ec2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -92,20 +92,9 @@ public void serializeRecord(T record) throws IOException {
 */
@Override
public SerializationResult copyToBufferBuilder(BufferBuilder 
targetBuffer) {
-   boolean mustCommit = false;
-   if (lengthBuffer.hasRemaining()) {
-   targetBuffer.append(lengthBuffer);
-   mustCommit = true;
-   }
-
-   if (dataBuffer.hasRemaining()) {
-   targetBuffer.append(dataBuffer);
-   mustCommit = true;
-   }
-
-   if (mustCommit) {
-   targetBuffer.commit();
-   }
+   targetBuffer.append(lengthBuffer);
+   targetBuffer.append(dataBuffer);
+   targetBuffer.commit();
 
return getSerializationResult(targetBuffer);
}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Network throughput performance regression after broadcast changes
> -
>
> Key: FLINK-10537
> URL: https://issues.apache.org/jira/browse/FLINK-10537
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> There is a slight network throughput regression introduced in: 
> https://issues.apache.org/jira/b

[GitHub] pnowojski closed pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913]

2018-10-17 Thread GitBox
pnowojski closed pull request #6833: [FLINK-10537][network] Fix network small 
performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 7738a3091e1..01feae03380 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -108,6 +108,7 @@ public int length() {
}
 
public void pruneBuffer() {
+   clear();
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing serialization buffer of " 
+ this.buffer.length + " bytes.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 6eebbbe88eb..c0cf35d9576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -82,9 +82,9 @@ public boolean isFullBuffer() {
SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
 
/**
-* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
-* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
-* {@link #copyToBufferBuilder(BufferBuilder)}.
+* Clears the buffer and checks to decrease the size of intermediate 
data serialization buffer
+* after finishing the whole serialization process including
+* {@link #serializeRecord(IOReadableWritable)} and {@link 
#copyToBufferBuilder(BufferBuilder)}.
 */
void prune();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index ba2ed0133fd..f0666791ec2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -92,20 +92,9 @@ public void serializeRecord(T record) throws IOException {
 */
@Override
public SerializationResult copyToBufferBuilder(BufferBuilder 
targetBuffer) {
-   boolean mustCommit = false;
-   if (lengthBuffer.hasRemaining()) {
-   targetBuffer.append(lengthBuffer);
-   mustCommit = true;
-   }
-
-   if (dataBuffer.hasRemaining()) {
-   targetBuffer.append(dataBuffer);
-   mustCommit = true;
-   }
-
-   if (mustCommit) {
-   targetBuffer.commit();
-   }
+   targetBuffer.append(lengthBuffer);
+   targetBuffer.append(dataBuffer);
+   targetBuffer.commit();
 
return getSerializationResult(targetBuffer);
}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >