[jira] [Assigned] (FLINK-10562) Relax (or document) table name constraints
[ https://issues.apache.org/jira/browse/FLINK-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-10562: Assignee: winifredtang (was: vinoyang) > Relax (or document) table name constraints > -- > > Key: FLINK-10562 > URL: https://issues.apache.org/jira/browse/FLINK-10562 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.1 >Reporter: Flavio Pompermaier >Assignee: winifredtang >Priority: Minor > > At the moment it's not possible to register a table whose name starts with a > number (e.g. 1_test). Moreover this constraint is not reported in the > documentation. > I propose to enable table name escaping somehow in order to enable more > general scenarios like those having spaces in between (e.g. select * from 'my > table' ). > Best, > Flavio > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils
[ https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654708#comment-16654708 ] ASF GitHub Bot commented on FLINK-10511: asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se… URL: https://github.com/apache/flink/pull/6845 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index f528bc4bcc4..cc12ff5ee5c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; @@ -50,7 +49,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; @@ -83,10 +82,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR; - /** * Base class for the Flink cluster entry points. * @@ -252,7 +247,7 @@ protected void initializeServices(Configuration configuration) throws Exception final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); final String portRange = getRPCPortRange(configuration); - commonRpcService = createRpcService(configuration, bindAddress, portRange); + commonRpcService = AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration); // update the configuration used to create the high availability services configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); @@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration configuration) { } } - protected RpcService createRpcService( - Configuration configuration, - String bindAddress, - String portRange) throws Exception { - ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR); - FiniteDuration duration = AkkaUtils.getTimeout(configuration); - return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); - } - protected HighAvailabilityServices createHaServices( Configuration configuration, Executor executor) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 3ee7641f717..28f04f7677d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution; import org.apache.flink.runtime.net.SSLUtils; @@ -30,8 +31,6 @@ import org.apache.flink.util.Preconditions; import akka.actor.ActorSystem; -import com.typesafe.config.Config; -import org.jboss.netty.channel.ChannelExcep
[GitHub] asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se…
asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se… URL: https://github.com/apache/flink/pull/6845 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index f528bc4bcc4..cc12ff5ee5c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; @@ -50,7 +49,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; @@ -83,10 +82,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR; - /** * Base class for the Flink cluster entry points. * @@ -252,7 +247,7 @@ protected void initializeServices(Configuration configuration) throws Exception final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); final String portRange = getRPCPortRange(configuration); - commonRpcService = createRpcService(configuration, bindAddress, portRange); + commonRpcService = AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration); // update the configuration used to create the high availability services configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); @@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration configuration) { } } - protected RpcService createRpcService( - Configuration configuration, - String bindAddress, - String portRange) throws Exception { - ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR); - FiniteDuration duration = AkkaUtils.getTimeout(configuration); - return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); - } - protected HighAvailabilityServices createHaServices( Configuration configuration, Executor executor) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 3ee7641f717..28f04f7677d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution; import org.apache.flink.runtime.net.SSLUtils; @@ -30,8 +31,6 @@ import org.apache.flink.util.Preconditions; import akka.actor.ActorSystem; -import com.typesafe.config.Config; -import org.jboss.netty.channel.ChannelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,25 @@ // RPC instantiation // + /** +* Utility method to create RPC servi
[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-430892698 @zentol About the `MetricRegistryConfiguration` I have reverted, please review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils
[ https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10511. --- Resolution: Fixed Fix Version/s: 1.7.0 Fixed via d5aa97f8a0e653125d97846d67f3fdf6a10dbedf > Code duplication of creating RPC service in ClusterEntrypoint and > AkkaRpcServiceUtils > - > > Key: FLINK-10511 > URL: https://issues.apache.org/jira/browse/FLINK-10511 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but > the ClusterEntrypoint use a protected method to do the same job. I think it's > better to use the same method in AkkaRpcServiceUtils for reuse of code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654704#comment-16654704 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-430892698 @zentol About the `MetricRegistryConfiguration` I have reverted, please review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654700#comment-16654700 ] ASF GitHub Bot commented on FLINK-7062: --- twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/6815#discussion_r225604789 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -58,7 +62,10 @@ class PatternSelectFunctionRunner( } outCRow.row = function.select(pattern) -outCRow +out match { Review comment: IMHO I would use a instanceof check here instead of a match. We should avoid calling Scala code in runtime code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support the basic functionality of MATCH_RECOGNIZE > -- > > Key: FLINK-7062 > URL: https://issues.apache.org/jira/browse/FLINK-7062 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} > in Flink SQL API which includes the support of syntax {{MEASURES}}, > {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases > with SQL like the following example: > {code} > SELECT T.aid, T.bid, T.cid > FROM MyTable > MATCH_RECOGNIZE ( > MEASURES > A.id AS aid, > B.id AS bid, > C.id AS cid > PATTERN (A B C) > DEFINE > A AS A.name = 'a', > B AS B.name = 'b', > C AS C.name = 'c' > ) AS T > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE
twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/6815#discussion_r225604789 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -58,7 +62,10 @@ class PatternSelectFunctionRunner( } outCRow.row = function.select(pattern) -outCRow +out match { Review comment: IMHO I would use a instanceof check here instead of a match. We should avoid calling Scala code in runtime code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10555) Port AkkaSslITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654698#comment-16654698 ] ASF GitHub Bot commented on FLINK-10555: TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base URL: https://github.com/apache/flink/pull/6849#issuecomment-430890279 @tillrohrmann yes I agree with moving the `BlobServer` SSL specific failures could go into a `BlobServerSSLTest`. For the second part of your comment, could you show me what "interact with it" means or say which "interact"s will be filtered by SSL configuration? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AkkaSslITCase to new code base > --- > > Key: FLINK-10555 > URL: https://issues.apache.org/jira/browse/FLINK-10555 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base
TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base URL: https://github.com/apache/flink/pull/6849#issuecomment-430890279 @tillrohrmann yes I agree with moving the `BlobServer` SSL specific failures could go into a `BlobServerSSLTest`. For the second part of your comment, could you show me what "interact with it" means or say which "interact"s will be filtered by SSL configuration? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM
[ https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654688#comment-16654688 ] ASF GitHub Bot commented on FLINK-10319: TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-430886993 As "deploying tasks in topological order", I agree that it could help. It is a orthonormal improvement though. For your hesitancy, I'd like to learn in which situation that a downstream operator would not be failed by a upstream failing. To keep the state clean either the upstream fails downstream and both restore from the least checkpoint, or we need to implement a failover strategy that take the responsibility for reconcile the state. The latter sounds quite costly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Too many requestPartitionState would crash JM > - > > Key: FLINK-10319 > URL: https://issues.apache.org/jira/browse/FLINK-10319 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Do not requestPartitionState from JM on partition request fail, which may > generate too many RPC requests and block JM. > We gain little benefit to check what state producer is in, which in the other > hand crash JM by too many RPC requests. Task could always > retriggerPartitionRequest from its InputGate, it would be fail if the > producer has gone and succeed if the producer alive. Anyway, no need to ask > for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM
TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-430886993 As "deploying tasks in topological order", I agree that it could help. It is a orthonormal improvement though. For your hesitancy, I'd like to learn in which situation that a downstream operator would not be failed by a upstream failing. To keep the state clean either the upstream fails downstream and both restore from the least checkpoint, or we need to implement a failover strategy that take the responsibility for reconcile the state. The latter sounds quite costly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10577) CEP's greedy() doesn't work
[ https://issues.apache.org/jira/browse/FLINK-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654571#comment-16654571 ] Dian Fu commented on FLINK-10577: - Hi [~simahoa], this is currently by design. For pattern (start middle+), when an event matching pattern `middle` comes, it will output `start middle` as we do not know if there will be any more events matching the pattern `middle` in stream and when another event matching `middle` comes, it will output `start middle middle` which can be seen an update of the previous sent result `start middle`. > CEP's greedy() doesn't work > --- > > Key: FLINK-10577 > URL: https://issues.apache.org/jira/browse/FLINK-10577 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.6.0 >Reporter: zhanghao >Priority: Major > > I think greedy operator has some problem. > Given the below java code: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream> input = env.fromElements( > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:00").getTime(), "r"), > Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 > 13:00:00").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:01").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:03").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:04").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:05").getTime(), "c"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:08").getTime(), "c"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:11").getTime(), "a") > ).assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor String>>(Time.seconds(2)) { > private static final long serialVersionUID = 1L; > @Override > public long extractTimestamp(Tuple3 element) { > return element.f1; > } > }); > AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent(); > Pattern, ?> pattern = Pattern. Long, String>>begin("start", strategy) > .where(new SimpleCondition>() { > private static final long serialVersionUID = 1L; > @Override > public boolean filter(Tuple3 e) { > return e.f2.equals("r") ? true : false; > } > }).followedBy("middle").where(new SimpleCondition String>>() { > private static final long serialVersionUID = 1L; > @Override > public boolean filter(Tuple3 e) throws Exception { > return !e.f2.equals("r") ? true : false; > } > }).oneOrMore().greedy() > .within(Time.seconds(10)); > CEP.pattern(input.keyBy(0), pattern) > .select(new PatternSelectFunction, String>() { > private static final long serialVersionUID = 1L; > @Override > public String select(Map>> > pattern) { > StringBuilder builder = new StringBuilder(); > List> start = pattern.get("start"); > List> middle = pattern.get("middle"); > for (Tuple3 t : start) { > builder.append(t.f0).append(","); > } > for (Tuple3 t : middle) { > builder.append(t.f0).append(","); > } > return builder.toString(); > } > }) > .print(); > env.execute();{code} > I would like to see:100,100,100,100,100,100 > however it matches 100,100 > I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping > some partial matches,it also matches 100,100. > Is there something important about greedy operator that i misunderstand? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654559#comment-16654559 ] ASF GitHub Bot commented on FLINK-7062: --- dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/6815#discussion_r226155097 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import java.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.cep.pattern.conditions.IterativeCondition +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName, primitiveDefaultValue} +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.util.Collector +import org.apache.flink.util.MathUtils.checkedDownCast +import java.lang.{Long => JLong} + +import scala.collection.JavaConverters._ + +/** + * A code generator for generating CEP related functions. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the first input of the Function + * @param currentPattern if generating condition the name of pattern, which the condition will + * be applied to + */ +class MatchCodeGenerator( +config: TableConfig, +nullableInput: Boolean, +input: TypeInformation[_ <: Any], +currentPattern: Option[String] = None) + extends CodeGenerator(config, nullableInput, input){ + + def generateMatchFunction[F <: Function, T <: Any]( +name: String, +clazz: Class[F], +bodyCode: String, +returnType: TypeInformation[T]) + : GeneratedFunction[F, T] = { +// This is a separate method from FunctionCodeGenerator#generateFunction because as of now +// functions in CEP library do not support rich interfaces +val funcName = newName(name) +val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName +val (functionClass, signature, inputStatements, isInterface) = + if (clazz == classOf[IterativeCondition[_]]) { +val baseClass = classOf[IterativeCondition[_]] +val inputTypeTerm = boxedTypeTermForTypeInfo(input) +val contextType = classOf[IterativeCondition.Context[_]].getCanonicalName + +(baseClass, + s"boolean filter( Object _in1, $contextType $contextTerm)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"), + false) + } else if (clazz == classOf[PatternSelectFunction[_, _]]) { +val baseClass = classOf[PatternSelectFunction[_, _]] +val inputTypeTerm = + s"java.util.Map>" + +(baseClass, + s"Object select($inputTypeTerm $input1Term)", + List(), + true) + } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) { +val baseClass = classOf[PatternFlatSelectFunction[_, _]] +val inputTypeTerm = + s"java.util.Map>" + +(baseClass, + s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm $collectorTerm)", + List(), + true) + } else { +throw new CodeGenException("Unsupported Function.") + } + +val extendsKeyword = if (isInterface) "implements" else "extends" +val funcCode = j""" + |public class $funcName $extendsKeyword ${functionClass.getCanonicalName} { + | + | ${reuseMemberCode()} + | + | public $funcName() throws Exception { + |${reuseInitCode()} + |
[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE
dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/6815#discussion_r226155097 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import java.util + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.cep.pattern.conditions.IterativeCondition +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName, primitiveDefaultValue} +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.util.Collector +import org.apache.flink.util.MathUtils.checkedDownCast +import java.lang.{Long => JLong} + +import scala.collection.JavaConverters._ + +/** + * A code generator for generating CEP related functions. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the first input of the Function + * @param currentPattern if generating condition the name of pattern, which the condition will + * be applied to + */ +class MatchCodeGenerator( +config: TableConfig, +nullableInput: Boolean, +input: TypeInformation[_ <: Any], +currentPattern: Option[String] = None) + extends CodeGenerator(config, nullableInput, input){ + + def generateMatchFunction[F <: Function, T <: Any]( +name: String, +clazz: Class[F], +bodyCode: String, +returnType: TypeInformation[T]) + : GeneratedFunction[F, T] = { +// This is a separate method from FunctionCodeGenerator#generateFunction because as of now +// functions in CEP library do not support rich interfaces +val funcName = newName(name) +val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName +val (functionClass, signature, inputStatements, isInterface) = + if (clazz == classOf[IterativeCondition[_]]) { +val baseClass = classOf[IterativeCondition[_]] +val inputTypeTerm = boxedTypeTermForTypeInfo(input) +val contextType = classOf[IterativeCondition.Context[_]].getCanonicalName + +(baseClass, + s"boolean filter( Object _in1, $contextType $contextTerm)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"), + false) + } else if (clazz == classOf[PatternSelectFunction[_, _]]) { +val baseClass = classOf[PatternSelectFunction[_, _]] +val inputTypeTerm = + s"java.util.Map>" + +(baseClass, + s"Object select($inputTypeTerm $input1Term)", + List(), + true) + } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) { +val baseClass = classOf[PatternFlatSelectFunction[_, _]] +val inputTypeTerm = + s"java.util.Map>" + +(baseClass, + s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm $collectorTerm)", + List(), + true) + } else { +throw new CodeGenException("Unsupported Function.") + } + +val extendsKeyword = if (isInterface) "implements" else "extends" +val funcCode = j""" + |public class $funcName $extendsKeyword ${functionClass.getCanonicalName} { + | + | ${reuseMemberCode()} + | + | public $funcName() throws Exception { + |${reuseInitCode()} + | } + | + | @Override + | public $signature throws Exception { + |${inputStatements.mkString("\n")} + |${reusePerRecordCode()} + |${reuseInputUnboxingCode()} + |$bodyCode + | } + |} +""".str
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654396#comment-16654396 ] ASF GitHub Bot commented on FLINK-10436: TisonKun opened a new pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872 ## What is the purpose of the change Add `ConfigOption#withFallbackKeys` to implement a similar function with `ConfigOption#withDeprecatedKeys`. That is, the `fallbackKey` falls back to current key if it has not been specified, but the use of current key would not print a deprecated message. ## Brief change log - Introduce `FallbackKey` class to represent a key with `isDeprecated` message. - Replace `deprecatedKeys` to `fallbackKeys` in `ConfigOption` ## Verifying this change - Add a test `ConfigurationTest#testFallbackKeys` to check the fallback mechanism works. - Manually check the issue report by the corresponding JIRA disappear. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
TisonKun opened a new pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872 ## What is the purpose of the change Add `ConfigOption#withFallbackKeys` to implement a similar function with `ConfigOption#withDeprecatedKeys`. That is, the `fallbackKey` falls back to current key if it has not been specified, but the use of current key would not print a deprecated message. ## Brief change log - Introduce `FallbackKey` class to represent a key with `isDeprecated` message. - Replace `deprecatedKeys` to `fallbackKeys` in `ConfigOption` ## Verifying this change - Add a test `ConfigurationTest#testFallbackKeys` to check the fallback mechanism works. - Manually check the issue report by the corresponding JIRA disappear. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol closed pull request #6861: [hotfix] [javadocs] Fix some single quote problems
zentol closed pull request #6861: [hotfix] [javadocs] Fix some single quote problems URL: https://github.com/apache/flink/pull/6861 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html index f5a2a5d77f9..787210119df 100644 --- a/docs/_includes/generated/akka_configuration.html +++ b/docs/_includes/generated/akka_configuration.html @@ -60,7 +60,7 @@ akka.log.lifecycle.events false -Turns on the Akka’s remote logging of events. Set this value to ‘true’ in case of debugging. +Turns on the Akka’s remote logging of events. Set this value to 'true' in case of debugging. akka.lookup.timeout diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java index 205e4372cd8..563f095ac01 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java @@ -35,7 +35,7 @@ * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API. * * The example assumes that a table exists in a local cassandra database, according to the following queries: - * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); */ public class BatchExample { diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java index 8b4e4b125cd..f32c2afb353 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -35,7 +35,7 @@ * This is an example showing the to use the {@link CassandraPojoInputFormat}/{@link CassandraTupleOutputFormat} in the Batch API. * * The example assumes that a table exists in a local cassandra database, according to the following queries: - * CREATE KEYSPACE IF NOT EXISTS flink WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE KEYSPACE IF NOT EXISTS flink WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; * CREATE TABLE IF NOT EXISTS flink.batches (id text, counter int, batch_id int, PRIMARY KEY(id, counter, batchId)); */ public class BatchPojoExample { diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java index 89e2d9e3ccf..6ee615db280 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java @@ -34,7 +34,7 @@ * Pojo's have to be annotated with datastax annotations to work with this sink. * * The example assumes that a table exists in a local cassandra database, according to the following queries: - * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY) */ public class CassandraPojoSinkExample { diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java index 72013d5141a..82daae18c16 100644 ---
[GitHub] zentol commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme
zentol commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme URL: https://github.com/apache/flink/pull/6855#issuecomment-430771951 the shading check must be updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme
[ https://issues.apache.org/jira/browse/FLINK-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654138#comment-16654138 ] ASF GitHub Bot commented on FLINK-10563: zentol commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme URL: https://github.com/apache/flink/pull/6855#issuecomment-430771951 the shading check must be updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Expose shaded Presto S3 filesystem under "s3p" scheme > - > > Key: FLINK-10563 > URL: https://issues.apache.org/jira/browse/FLINK-10563 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 > filesystem at the same time. If we exposed the presto filesystem under an > additional scheme we enable using both at the same time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10586) Running RestServerEndpointITCase may cause Kernel Panic
[ https://issues.apache.org/jira/browse/FLINK-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-10586: - Summary: Running RestServerEndpointITCase may cause Kernel Panic (was: Running {{RestServerEndpointITCase}} may cause Kernel Panic) > Running RestServerEndpointITCase may cause Kernel Panic > --- > > Key: FLINK-10586 > URL: https://issues.apache.org/jira/browse/FLINK-10586 > Project: Flink > Issue Type: Bug > Components: REST, Tests >Affects Versions: 1.6.1, 1.7.0 > Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 > *OS:* macOS High Sierra 10.13.6 (17G65) > *Maven:* 3.2.5 > *Java:* > openjdk version "1.8.0_181" > OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) > OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) >Reporter: Gary Yao >Priority: Major > Fix For: 1.8.0 > > Attachments: loop_test_kernel_panic.patch > > > Running the tests in {{RestServerEndpointITCase}} in a loop may cause a > Kernel panic on OS X. I have observed this to happen with tests that use the > {{OkHttpClient}}, such as {{testDefaultVersionRouting}}. > To run the aforementioned test in a loop, apply the git patch in the > attachment, and execute the following command: > {code} > mvn clean integration-test -pl flink-runtime -am > -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false > -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir > -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties > {code} > The test eventually fails with the exception below. A few moments later the > operation system restarts. > {noformat} > > 16:43:17,421 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase >- > > Test > testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase) > is running. > > 16:43:17,422 WARN > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Upload directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload > does not exist, or has been deleted externally. Previously uploaded files > are no longer available. > 16:43:17,422 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Created directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload > for file uploads. > 16:43:17,422 INFO org.apache.flink.runtime.rest.RestClient >- Rest client endpoint started. > 16:43:17,422 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Starting rest endpoint. > 16:43:17,423 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Rest endpoint listening at localhost:57561 > 16:43:17,429 INFO org.apache.flink.runtime.rest.RestClient >- Shutting down rest endpoint. > 16:43:17,431 INFO org.apache.flink.runtime.rest.RestClient >- Rest endpoint shutdown complete. > 16:43:17,431 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Shutting down rest endpoint. > 16:43:17,435 ERROR org.apache.flink.runtime.rest.RestServerEndpointITCase >- > > Test > testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase) > failed with: > java.lang.AssertionError: Bad file descriptor (Write failed) > at > org.apache.flink.runtime.rest.RestServerEndpointITCase.testDefaultVersionRouting(RestServerEndpointITCase.java:260) > at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.e
[jira] [Created] (FLINK-10586) Running {{RestServerEndpointITCase}} may cause Kernel Panic
Gary Yao created FLINK-10586: Summary: Running {{RestServerEndpointITCase}} may cause Kernel Panic Key: FLINK-10586 URL: https://issues.apache.org/jira/browse/FLINK-10586 Project: Flink Issue Type: Bug Components: REST, Tests Affects Versions: 1.6.1, 1.7.0 Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 *OS:* macOS High Sierra 10.13.6 (17G65) *Maven:* 3.2.5 *Java:* openjdk version "1.8.0_181" OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) Reporter: Gary Yao Fix For: 1.8.0 Attachments: loop_test_kernel_panic.patch Running the tests in {{RestServerEndpointITCase}} in a loop may cause a Kernel panic on OS X. I have observed this to happen with tests that use the {{OkHttpClient}}, such as {{testDefaultVersionRouting}}. To run the aforementioned test in a loop, apply the git patch in the attachment, and execute the following command: {code} mvn clean integration-test -pl flink-runtime -am -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties {code} The test eventually fails with the exception below. A few moments later the operation system restarts. {noformat} 16:43:17,421 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase - Test testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase) is running. 16:43:17,422 WARN org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Upload directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available. 16:43:17,422 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Created directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit8985109840988505849/junit7034313885477230537/flink-web-upload for file uploads. 16:43:17,422 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 16:43:17,422 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Starting rest endpoint. 16:43:17,423 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Rest endpoint listening at localhost:57561 16:43:17,429 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 16:43:17,431 INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. 16:43:17,431 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Shutting down rest endpoint. 16:43:17,435 ERROR org.apache.flink.runtime.rest.RestServerEndpointITCase - Test testDefaultVersionRouting[4019](org.apache.flink.runtime.rest.RestServerEndpointITCase) failed with: java.lang.AssertionError: Bad file descriptor (Write failed) at org.apache.flink.runtime.rest.RestServerEndpointITCase.testDefaultVersionRouting(RestServerEndpointITCase.java:260) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUni
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654030#comment-16654030 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226050176 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -90,7 +92,9 @@ /** * Creates a new MetricRegistry and starts the configured reporter. */ - public MetricRegistryImpl(MetricRegistryConfiguration config) { + public MetricRegistryImpl(Configuration configuration) { + MetricRegistryConfiguration config = MetricRegistryConfiguration.fromConfiguration(configuration); Review comment: the whole point of the `MetricRegistryConfiguration` is to not expose the `Configuration` directly to the `MetricRegistry`. This should be reverted. Instead, extend the MRConfiguration with a separate configuration object for the akka stuff that the `MetricQueryService` requires, which btw. can be determined before the actor is even started. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges
zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226050176 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -90,7 +92,9 @@ /** * Creates a new MetricRegistry and starts the configured reporter. */ - public MetricRegistryImpl(MetricRegistryConfiguration config) { + public MetricRegistryImpl(Configuration configuration) { + MetricRegistryConfiguration config = MetricRegistryConfiguration.fromConfiguration(configuration); Review comment: the whole point of the `MetricRegistryConfiguration` is to not expose the `Configuration` directly to the `MetricRegistry`. This should be reverted. Instead, extend the MRConfiguration with a separate configuration object for the akka stuff that the `MetricQueryService` requires, which btw. can be determined before the actor is even started. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653931#comment-16653931 ] ASF GitHub Bot commented on FLINK-9697: --- alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-430720238 Fine with me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka
alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-430720238 Fine with me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10579) Remove unused deploysettings.xml
[ https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10579. Resolution: Fixed master: 812b84c332d2e23c5a9930c81f2a3002fe2e9dd2 > Remove unused deploysettings.xml > > > Key: FLINK-10579 > URL: https://issues.apache.org/jira/browse/FLINK-10579 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be > removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10579) Remove unused deploysettings.xml
[ https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653825#comment-16653825 ] ASF GitHub Bot commented on FLINK-10579: zentol closed pull request #6865: [FLINK-10579][build] Remove unused deploysettings.xml URL: https://github.com/apache/flink/pull/6865 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/deploysettings.xml b/deploysettings.xml deleted file mode 100644 index e36d8480d47..000 --- a/deploysettings.xml +++ /dev/null @@ -1,38 +0,0 @@ - - -http://maven.apache.org/SETTINGS/1.0.0"; - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; - xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 - http://maven.apache.org/xsd/settings-1.0.0.xsd";> - - - apache.snapshots.https - ${sonatype_user} - ${sonatype_pw} - - - apache.releases.https - ${sonatype_user} - ${sonatype_pw} - - - - diff --git a/tools/releasing/create_source_release.sh b/tools/releasing/create_source_release.sh index 47b21768614..fd1b13cf41b 100755 --- a/tools/releasing/create_source_release.sh +++ b/tools/releasing/create_source_release.sh @@ -63,7 +63,7 @@ cd ${CLONE_DIR} rsync -a \ --exclude ".git" --exclude ".gitignore" --exclude ".gitattributes" --exclude ".travis.yml" \ - --exclude "deploysettings.xml" --exclude "CHANGELOG" --exclude ".github" --exclude "target" \ + --exclude "CHANGELOG" --exclude ".github" --exclude "target" \ --exclude ".idea" --exclude "*.iml" --exclude ".DS_Store" --exclude "build-target" \ --exclude "docs/content" --exclude ".rubydeps" \ . flink-$RELEASE_VERSION This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove unused deploysettings.xml > > > Key: FLINK-10579 > URL: https://issues.apache.org/jira/browse/FLINK-10579 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be > removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #6865: [FLINK-10579][build] Remove unused deploysettings.xml
zentol closed pull request #6865: [FLINK-10579][build] Remove unused deploysettings.xml URL: https://github.com/apache/flink/pull/6865 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/deploysettings.xml b/deploysettings.xml deleted file mode 100644 index e36d8480d47..000 --- a/deploysettings.xml +++ /dev/null @@ -1,38 +0,0 @@ - - -http://maven.apache.org/SETTINGS/1.0.0"; - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; - xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 - http://maven.apache.org/xsd/settings-1.0.0.xsd";> - - - apache.snapshots.https - ${sonatype_user} - ${sonatype_pw} - - - apache.releases.https - ${sonatype_user} - ${sonatype_pw} - - - - diff --git a/tools/releasing/create_source_release.sh b/tools/releasing/create_source_release.sh index 47b21768614..fd1b13cf41b 100755 --- a/tools/releasing/create_source_release.sh +++ b/tools/releasing/create_source_release.sh @@ -63,7 +63,7 @@ cd ${CLONE_DIR} rsync -a \ --exclude ".git" --exclude ".gitignore" --exclude ".gitattributes" --exclude ".travis.yml" \ - --exclude "deploysettings.xml" --exclude "CHANGELOG" --exclude ".github" --exclude "target" \ + --exclude "CHANGELOG" --exclude ".github" --exclude "target" \ --exclude ".idea" --exclude "*.iml" --exclude ".DS_Store" --exclude "build-target" \ --exclude "docs/content" --exclude ".rubydeps" \ . flink-$RELEASE_VERSION This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10579) Remove unused deploysettings.xml
[ https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653747#comment-16653747 ] ASF GitHub Bot commented on FLINK-10579: StefanRRichter commented on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849 LGTM 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove unused deploysettings.xml > > > Key: FLINK-10579 > URL: https://issues.apache.org/jira/browse/FLINK-10579 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be > removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10579) Remove unused deploysettings.xml
[ https://issues.apache.org/jira/browse/FLINK-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653748#comment-16653748 ] ASF GitHub Bot commented on FLINK-10579: StefanRRichter removed a comment on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849 LGTM 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove unused deploysettings.xml > > > Key: FLINK-10579 > URL: https://issues.apache.org/jira/browse/FLINK-10579 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9967 resolved the {{deploysettings.xml}} file is unused and can be > removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter removed a comment on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml
StefanRRichter removed a comment on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849 LGTM 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml
StefanRRichter commented on issue #6865: [FLINK-10579][build] Remove unused deploysettings.xml URL: https://github.com/apache/flink/pull/6865#issuecomment-430679849 LGTM 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10567) Lost serialize fields when ttl state store with the mutable serializer
[ https://issues.apache.org/jira/browse/FLINK-10567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653741#comment-16653741 ] ASF GitHub Bot commented on FLINK-10567: StefanRRichter commented on a change in pull request #6860: [FLINK-10567]Fix TtlStateSerializer lost field during duplicate() URL: https://github.com/apache/flink/pull/6860#discussion_r225984090 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -224,7 +224,7 @@ protected Object getField(@Nonnull TtlValue v, int index) { TypeSerializer ... originalSerializers) { Preconditions.checkNotNull(originalSerializers); Preconditions.checkArgument(originalSerializers.length == 2); - return new TtlSerializer<>(precomputed, (TypeSerializer) originalSerializers[1]); + return new TtlSerializer<>(precomputed, (TypeSerializer) originalSerializers[0], (TypeSerializer) originalSerializers[1]); Review comment: We could just `return new TtlSerializer<>(precomputed, originalSerializers);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lost serialize fields when ttl state store with the mutable serializer > -- > > Key: FLINK-10567 > URL: https://issues.apache.org/jira/browse/FLINK-10567 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In TtlStateSerializer, when it is duplicate, it will lost the long field > serializer which will lead to exception when createInstance, which can easily > be reproduced by the test case -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6860: [FLINK-10567]Fix TtlStateSerializer lost field during duplicate()
StefanRRichter commented on a change in pull request #6860: [FLINK-10567]Fix TtlStateSerializer lost field during duplicate() URL: https://github.com/apache/flink/pull/6860#discussion_r225984090 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -224,7 +224,7 @@ protected Object getField(@Nonnull TtlValue v, int index) { TypeSerializer ... originalSerializers) { Preconditions.checkNotNull(originalSerializers); Preconditions.checkArgument(originalSerializers.length == 2); - return new TtlSerializer<>(precomputed, (TypeSerializer) originalSerializers[1]); + return new TtlSerializer<>(precomputed, (TypeSerializer) originalSerializers[0], (TypeSerializer) originalSerializers[1]); Review comment: We could just `return new TtlSerializer<>(precomputed, originalSerializers);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653695#comment-16653695 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957516 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653689#comment-16653689 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225960810 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653697#comment-16653697 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957980 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[GitHub] pnowojski commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time
pnowojski commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time URL: https://github.com/apache/flink/pull/6776#discussion_r225970725 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala ## @@ -272,24 +264,30 @@ class TemporalRowtimeJoin( * Binary search `rightRowsSorted` to find the latest right row to join with `leftTime`. * Latest means a right row with largest time that is still smaller or equal to `leftTime`. * -* @return index of such element. If such row was not found (either `rightRowsSorted` is empty -* or all `rightRowsSorted` are are newer) return -1. +* @return found element or `Option.empty` If such row was not found (either `rightRowsSorted` +* is empty or all `rightRowsSorted` are are newer). */ - private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: Long): Int = { + private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: Long): Option[Row] = { Review comment: `Option` > `NullPointerException` :( Do not use nulls without working `@Nullable` annotations This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9715) Support versioned joins with event time
[ https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653705#comment-16653705 ] ASF GitHub Bot commented on FLINK-9715: --- pnowojski commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time URL: https://github.com/apache/flink/pull/6776#discussion_r225970725 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala ## @@ -272,24 +264,30 @@ class TemporalRowtimeJoin( * Binary search `rightRowsSorted` to find the latest right row to join with `leftTime`. * Latest means a right row with largest time that is still smaller or equal to `leftTime`. * -* @return index of such element. If such row was not found (either `rightRowsSorted` is empty -* or all `rightRowsSorted` are are newer) return -1. +* @return found element or `Option.empty` If such row was not found (either `rightRowsSorted` +* is empty or all `rightRowsSorted` are are newer). */ - private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: Long): Int = { + private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: Long): Option[Row] = { Review comment: `Option` > `NullPointerException` :( Do not use nulls without working `@Nullable` annotations This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support versioned joins with event time > --- > > Key: FLINK-9715 > URL: https://issues.apache.org/jira/browse/FLINK-9715 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should work with event time -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653691#comment-16653691 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225956138 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() Review comment: I would avoid nulls and wrap this variable with `Option` and pass `Option` to the `updateCleanupTimer` method. Handling nullable variables without compilation errors enables is very dangerous and error prone. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, plea
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653694#comment-16653694 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225954133 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { Review comment: As we discussed, I would slightly lean toward dropping the `final` words here, dropping the proxy/wrapper methods `onXYZ` and relaying on user to do something like: ``` override def open() = { super.open() // my fancy logic. } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653698#comment-16653698 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957668 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653687#comment-16653687 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225955291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { Review comment: here, I'm not sure. Either I would do the same as for `open` or leave this methods not implemented and relay on user to write sth like: ``` override def processElement1(element) = { registerProcessingCleanupTimer() // my fancy logic } ``` The argument in favour of second option is that maybe we want to register/bump the timers only on `processElement2` (whenever we touch the build side) and not on `processElement1`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > >
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653690#comment-16653690 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225965180 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653692#comment-16653692 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225962699 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653693#comment-16653693 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957024 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653688#comment-16653688 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225959340 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = Review comment: Could you deduplicate the setup/closing logic of those methods? For example extract ``` val operator: StubOperatorWithTTLTimers = new StubOperatorWithTTLTimers(streamQueryConfig) testHarness = createTestHarness(operator) testHarness.open() ``` to a setup method or to `createStubHarnessWithTTLTimers()` method. If you pick the `setup` method, `close` could be moved to tearDown (but that would require `testHarness` to be a field not a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653696#comment-16653696 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225961569 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225960810 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, has
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225961569 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, has
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957024 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one +* cleanup timer is registered per key. +*/ Review comment: I would drop this comment. The method is pretty self explanatory on it's own and shorter to read than the comment itself. -
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225962699 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, has
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957516 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one +* cleanup timer is registered per key. +*/ + private def updateCleanupTimer(currentProcessingTime: JLong, currentCleanupTime: JLong): Unit = { +if (currentCleanupTime != null) { + timerService.deleteProcessingTimeTimer(currentCleanupTime)
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225955291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { Review comment: here, I'm not sure. Either I would do the same as for `open` or leave this methods not implemented and relay on user to write sth like: ``` override def processElement1(element) = { registerProcessingCleanupTimer() // my fancy logic } ``` The argument in favour of second option is that maybe we want to register/bump the timers only on `processElement2` (whenever we touch the build side) and not on `processElement1`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957668 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one +* cleanup timer is registered per key. +*/ + private def updateCleanupTimer(currentProcessingTime: JLong, currentCleanupTime: JLong): Unit = { +if (currentCleanupTime != null) { + timerService.deleteProcessingTimeTimer(currentCleanupTime)
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225956138 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() Review comment: I would avoid nulls and wrap this variable with `Option` and pass `Option` to the `updateCleanupTimer` method. Handling nullable variables without compilation errors enables is very dangerous and error prone. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225954133 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { Review comment: As we discussed, I would slightly lean toward dropping the `final` words here, dropping the proxy/wrapper methods `onXYZ` and relaying on user to do something like: ``` override def open() = { super.open() // my fancy logic. } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957980 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one +* cleanup timer is registered per key. +*/ + private def updateCleanupTimer(currentProcessingTime: JLong, currentCleanupTime: JLong): Unit = { +if (currentCleanupTime != null) { + timerService.deleteProcessingTimeTimer(currentCleanupTime)
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225965180 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, has
[GitHub] pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225959340 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = Review comment: Could you deduplicate the setup/closing logic of those methods? For example extract ``` val operator: StubOperatorWithTTLTimers = new StubOperatorWithTTLTimers(streamQueryConfig) testHarness = createTestHarness(operator) testHarness.open() ``` to a setup method or to `createStubHarnessWithTTLTimers()` method. If you pick the `setup` method, `close` could be moved to tearDown (but that would require `testHarness` to be a field not a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653673#comment-16653673 ] ASF GitHub Bot commented on FLINK-10205: tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796 Alright, now the problem is a bit clearer to me. The underlying problem is that the `InputSplitAssigner's` semantics in case of a failover are not well defined. This is mainly due to the fact that Flink evolved over time. The general idea of the `InputSplitAssigner` is to lazily assign work to sources which have completely consumed their current `InputSplit`. The order in which this happens should not affect the correctness of the result. If you say that in case of a recovery the exact same `InputSplit` assignment needs to happen again, then I think it must be because our sources have some kind of state. Otherwise, it should not matter which source task completes the `InputSplit`, right? If this is correct, then we would run into the same problem if a JM failure happens, because we would lose all `InputSplit` assignment information which is stored on the JM. So stateful sources with `InputSplits` don't work at the moment (in the general case). If we assume that our sources are stateless, then simply returning the input splits to the assigner and letting the next idling task take it should work. In your example of the infinite stream which is initialized via the `InputSplits` there would be no other task competing for the `InputSplit` of a failed task because by definition they never finish their work, right? If multiple tasks fail, then the mapping might be different after the recovery, but every task would continue consuming from a single `InputSplit`. I think the problem here is that you abused the `InputSplitAssigner` for something it is not yet intended to do. The reason why I'm a bit hesitant here is because I think we do not fully understand yet what we actually want to have. Moreover, some corner cases not clear to me yet. For example, why would it be ok for a global failover to change the mapping and not for region failover? Another example is how to handle the case where we lose a TM and need to downscale. Would that effectively be a global failover where we redistribute all `InputSplits` (I would think so). Before starting any concrete implementation steps, I think we should properly design this feature to get it right. A very related topic is actually the new source interface. Depending on how much we are able to unify batch and streaming, the whole `InputSplit` assignment might move into a single task (similar to the `ContinuousFileMonitoringSink`) and the assignment might become part of a checkpoint. That way, we would no longer need to take care of this on the JM side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2, 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > document: > [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796 Alright, now the problem is a bit clearer to me. The underlying problem is that the `InputSplitAssigner's` semantics in case of a failover are not well defined. This is mainly due to the fact that Flink evolved over time. The general idea of the `InputSplitAssigner` is to lazily assign work to sources which have completely consumed their current `InputSplit`. The order in which this happens should not affect the correctness of the result. If you say that in case of a recovery the exact same `InputSplit` assignment needs to happen again, then I think it must be because our sources have some kind of state. Otherwise, it should not matter which source task completes the `InputSplit`, right? If this is correct, then we would run into the same problem if a JM failure happens, because we would lose all `InputSplit` assignment information which is stored on the JM. So stateful sources with `InputSplits` don't work at the moment (in the general case). If we assume that our sources are stateless, then simply returning the input splits to the assigner and letting the next idling task take it should work. In your example of the infinite stream which is initialized via the `InputSplits` there would be no other task competing for the `InputSplit` of a failed task because by definition they never finish their work, right? If multiple tasks fail, then the mapping might be different after the recovery, but every task would continue consuming from a single `InputSplit`. I think the problem here is that you abused the `InputSplitAssigner` for something it is not yet intended to do. The reason why I'm a bit hesitant here is because I think we do not fully understand yet what we actually want to have. Moreover, some corner cases not clear to me yet. For example, why would it be ok for a global failover to change the mapping and not for region failover? Another example is how to handle the case where we lose a TM and need to downscale. Would that effectively be a global failover where we redistribute all `InputSplits` (I would think so). Before starting any concrete implementation steps, I think we should properly design this feature to get it right. A very related topic is actually the new source interface. Depending on how much we are able to unify batch and streaming, the whole `InputSplit` assignment might move into a single task (similar to the `ContinuousFileMonitoringSink`) and the assignment might become part of a checkpoint. That way, we would no longer need to take care of this on the JM side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10582) Make REST executor thread priority configurable
[ https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10582. Resolution: Fixed master: 3f231ac46537654aebdd73b0f42aa823386bf901 1.6: 967b31b333e6f4b014ea3041f420bfaff2484618 1.5: 98f38288a12065dd665f7e0b2420d57f6408121a > Make REST executor thread priority configurable > --- > > Key: FLINK-10582 > URL: https://issues.apache.org/jira/browse/FLINK-10582 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.5.5, 1.6.2 > > > With FLINK-10282, we introduced a dedicated thread pool for the REST server > endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, > however, might affect existing users by making some of the REST calls no > longer responsive (e.g. if the other components' threads take all the time). > Therefore, I propose to set the default thread priority to > {{Thread.NORM_PRIORITY}} and make it additionally configurable such that > users can change it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10582) Make REST executor thread priority configurable
[ https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10582: - Fix Version/s: 1.6.2 1.5.5 > Make REST executor thread priority configurable > --- > > Key: FLINK-10582 > URL: https://issues.apache.org/jira/browse/FLINK-10582 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.5.5, 1.6.2, 1.7.0 > > > With FLINK-10282, we introduced a dedicated thread pool for the REST server > endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, > however, might affect existing users by making some of the REST calls no > longer responsive (e.g. if the other components' threads take all the time). > Therefore, I propose to set the default thread priority to > {{Thread.NORM_PRIORITY}} and make it additionally configurable such that > users can change it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10582) Make REST executor thread priority configurable
[ https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653652#comment-16653652 ] ASF GitHub Bot commented on FLINK-10582: zentol closed pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6868 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/rest_configuration.html b/docs/_includes/generated/rest_configuration.html index 1aa963fb3e2..2c5f539a480 100644 --- a/docs/_includes/generated/rest_configuration.html +++ b/docs/_includes/generated/rest_configuration.html @@ -62,5 +62,10 @@ 4 The number of threads for the asynchronous processing of requests. + +rest.server.thread-priority +5 +Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing. + diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index edfd39be808..11c38de06c6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; @@ -121,4 +122,12 @@ key("rest.server.numThreads") .defaultValue(4) .withDescription("The number of threads for the asynchronous processing of requests."); + + public static final ConfigOption SERVER_THREAD_PRIORITY = key("rest.server.thread-priority") + .defaultValue(Thread.NORM_PRIORITY) + .withDescription(Description.builder() + .text("Thread priority of the REST server's executor for processing asynchronous requests. " + + "Lowering the thread priority will give Flink's main components more CPU time whereas " + + "increasing will allocate more time for the REST server's processing.") + .build()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index c09c41bb2ec..6d557d0815f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -141,6 +141,7 @@ public AbstractDispatcherResourceManagerComponentFactory( blobServer, WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever(actorSystem, timeout), highAvailabilityServices.getWebMonitorLeaderElectionService(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 3fc7007f9c5..69bad47e158 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -356,6 +356,7 @@ public void start() throws Exception { blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"),
[GitHub] zentol closed pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable
zentol closed pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6868 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/rest_configuration.html b/docs/_includes/generated/rest_configuration.html index 1aa963fb3e2..2c5f539a480 100644 --- a/docs/_includes/generated/rest_configuration.html +++ b/docs/_includes/generated/rest_configuration.html @@ -62,5 +62,10 @@ 4 The number of threads for the asynchronous processing of requests. + +rest.server.thread-priority +5 +Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing. + diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index edfd39be808..11c38de06c6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; @@ -121,4 +122,12 @@ key("rest.server.numThreads") .defaultValue(4) .withDescription("The number of threads for the asynchronous processing of requests."); + + public static final ConfigOption SERVER_THREAD_PRIORITY = key("rest.server.thread-priority") + .defaultValue(Thread.NORM_PRIORITY) + .withDescription(Description.builder() + .text("Thread priority of the REST server's executor for processing asynchronous requests. " + + "Lowering the thread priority will give Flink's main components more CPU time whereas " + + "increasing will allocate more time for the REST server's processing.") + .build()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index c09c41bb2ec..6d557d0815f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -141,6 +141,7 @@ public AbstractDispatcherResourceManagerComponentFactory( blobServer, WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever(actorSystem, timeout), highAvailabilityServices.getWebMonitorLeaderElectionService(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 3fc7007f9c5..69bad47e158 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -356,6 +356,7 @@ public void start() throws Exception { blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org
[jira] [Commented] (FLINK-10582) Make REST executor thread priority configurable
[ https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653650#comment-16653650 ] ASF GitHub Bot commented on FLINK-10582: zentol commented on a change in pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6868#discussion_r225961269 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java ## @@ -121,4 +122,12 @@ key("rest.server.numThreads") Review comment: i guess I didn't follow naming conventions here? we could still hotfix that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make REST executor thread priority configurable > --- > > Key: FLINK-10582 > URL: https://issues.apache.org/jira/browse/FLINK-10582 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-10282, we introduced a dedicated thread pool for the REST server > endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, > however, might affect existing users by making some of the REST calls no > longer responsive (e.g. if the other components' threads take all the time). > Therefore, I propose to set the default thread priority to > {{Thread.NORM_PRIORITY}} and make it additionally configurable such that > users can change it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable
zentol commented on a change in pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6868#discussion_r225961269 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java ## @@ -121,4 +122,12 @@ key("rest.server.numThreads") Review comment: i guess I didn't follow naming conventions here? we could still hotfix that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol closed pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's thread priority configurable
zentol closed pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6869 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/rest_configuration.html b/docs/_includes/generated/rest_configuration.html index 1aa963fb3e2..2c5f539a480 100644 --- a/docs/_includes/generated/rest_configuration.html +++ b/docs/_includes/generated/rest_configuration.html @@ -62,5 +62,10 @@ 4 The number of threads for the asynchronous processing of requests. + +rest.server.thread-priority +5 +Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing. + diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index edfd39be808..11c38de06c6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; @@ -121,4 +122,12 @@ key("rest.server.numThreads") .defaultValue(4) .withDescription("The number of threads for the asynchronous processing of requests."); + + public static final ConfigOption SERVER_THREAD_PRIORITY = key("rest.server.thread-priority") + .defaultValue(Thread.NORM_PRIORITY) + .withDescription(Description.builder() + .text("Thread priority of the REST server's executor for processing asynchronous requests. " + + "Lowering the thread priority will give Flink's main components more CPU time whereas " + + "increasing will allocate more time for the REST server's processing.") + .build()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index fd0a0a1d39f..4f575ce4f3e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -331,6 +331,7 @@ protected void startClusterComponents( transientBlobCache, WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever(actorSystem, timeout), highAvailabilityServices.getWebMonitorLeaderElectionService()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 4bfdb25c4d6..1d2c8c75eaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -348,6 +348,7 @@ public void start() throws Exception { blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( actorSystem, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 02d92dc54fb..c480c33c671 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.jav
[GitHub] zentol closed pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's thread priority configurable
zentol closed pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6870 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/rest_configuration.html b/docs/_includes/generated/rest_configuration.html index 1aa963fb3e2..2c5f539a480 100644 --- a/docs/_includes/generated/rest_configuration.html +++ b/docs/_includes/generated/rest_configuration.html @@ -62,5 +62,10 @@ 4 The number of threads for the asynchronous processing of requests. + +rest.server.thread-priority +5 +Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing. + diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index edfd39be808..11c38de06c6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; @@ -121,4 +122,12 @@ key("rest.server.numThreads") .defaultValue(4) .withDescription("The number of threads for the asynchronous processing of requests."); + + public static final ConfigOption SERVER_THREAD_PRIORITY = key("rest.server.thread-priority") + .defaultValue(Thread.NORM_PRIORITY) + .withDescription(Description.builder() + .text("Thread priority of the REST server's executor for processing asynchronous requests. " + + "Lowering the thread priority will give Flink's main components more CPU time whereas " + + "increasing will allocate more time for the REST server's processing.") + .build()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 2c34b530804..c4c583853bf 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -330,6 +330,7 @@ protected void startClusterComponents( transientBlobCache, WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever(actorSystem, timeout), highAvailabilityServices.getWebMonitorLeaderElectionService()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index a7840d64830..74ff770e69a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -350,6 +350,7 @@ public void start() throws Exception { blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( actorSystem, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 02d92dc54fb..c480c33c671 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.jav
[jira] [Commented] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method
[ https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653630#comment-16653630 ] Till Rohrmann commented on FLINK-10575: --- I think this method should only be removed after we have removed {{JobManager}}, because it depends on it. > Remove deprecated ExecutionGraphBuilder.buildGraph method > - > > Key: FLINK-10575 > URL: https://issues.apache.org/jira/browse/FLINK-10575 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > > ExecutionGraphBuilder is not a public API and we should able to remove > deprecated method such as: > @Deprecated > public static ExecutionGraph buildGraph > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653624#comment-16653624 ] Till Rohrmann commented on FLINK-10436: --- Yes, I think this would solve the problem. We would then only print the deprecation warning if {{isDeprecated}} is {{true}}. > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.
[ https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-10585: - Description: Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a loop may trigger the HTTP server to send a response with {{Content-Type: application/octet-stream}}, which causes the test to fail. The expected {{Content-Type}} is {{application/json}}. Note that the REST handler used for testing, can only return json responses. The failure can likely be triggered for other tests inside {{RestServerEndpointITCase}} as well. The behavior has not been observed on Linux so far. To run the test in a loop, apply the git patch in the attachment, and execute the following command: {code} mvn clean integration-test -pl flink-runtime -am -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties {code} After a while you may see the following stacktrace in the test's log file: {noformat} 15:25:45,619 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase - Test testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase) is running. 15:25:45,620 WARN org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Upload directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Created directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload for file uploads. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Starting rest endpoint. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Rest endpoint listening at localhost:52841 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient - Response was not valid JSON. org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; line: 1, column: 2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2891) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:707) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272) at org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:505) at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:453) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.apache.flink.shaded.
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653601#comment-16653601 ] ASF GitHub Bot commented on FLINK-10583: kl0u opened a new pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871 ## What is the purpose of the change This is the first step for the implementation of FLINK-10583 and FLINK-10584. It introduces the `AbstractTwoInputStreamOperatorWithTTL` which contains the basic functionality for implementing state TTL based on timers. This operator makes sure that: 1) only at most one timer is registered per key 2) both "sides" of the operator (processElement1 and processElement2) are treated equally when it comes to registering cleanup timers. ## Verifying this change Added the `AbstractTwoInputStreamOperatorWithTTLTest` with tests for the class. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) R @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10583: --- Labels: pull-request-available (was: ) > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u opened a new pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator.
kl0u opened a new pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871 ## What is the purpose of the change This is the first step for the implementation of FLINK-10583 and FLINK-10584. It introduces the `AbstractTwoInputStreamOperatorWithTTL` which contains the basic functionality for implementing state TTL based on timers. This operator makes sure that: 1) only at most one timer is registered per key 2) both "sides" of the operator (processElement1 and processElement2) are treated equally when it comes to registering cleanup timers. ## Verifying this change Added the `AbstractTwoInputStreamOperatorWithTTLTest` with tests for the class. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) R @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.
[ https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-10585: - Description: Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a loop may trigger the HTTP server to send a response with {{Content-Type: application/octet-stream}}, which causes the test to fail. The expected {{Content-Type}} is {{application/json}}. Note that the REST handler used for testing, can only return json responses. The failure can likely be triggered for other tests inside {{RestServerEndpointITCase}} as well. The behavior so far could not be observed on Linux. To run the test in a loop, apply the git patch in the attachment, and execute the following command: {code} mvn clean integration-test -pl flink-runtime -am -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties {code} After a while you may see the following stacktrace in the test's log file: {noformat} 15:25:45,619 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase - Test testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase) is running. 15:25:45,620 WARN org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Upload directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Created directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload for file uploads. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Starting rest endpoint. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Rest endpoint listening at localhost:52841 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient - Response was not valid JSON. org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; line: 1, column: 2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2891) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:707) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272) at org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:505) at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:453) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.apache.flink.shaded.
[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.
[ https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-10585: - Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 *OS:* macOS High Sierra 10.13.6 (17G65) *Maven:* 3.2.5 *Java:* openjdk version "1.8.0_181" OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) was: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 *OS:* macOS High Sierra 10.13.6 (17G65) *Maven:* 3.2.5 *Java:* openjdk version "1.8.0_181" OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) > RestServerEndpoint responds with wrong Content-Type in Integration Test. > > > Key: FLINK-10585 > URL: https://issues.apache.org/jira/browse/FLINK-10585 > Project: Flink > Issue Type: Bug > Components: REST, Tests >Affects Versions: 1.6.1, 1.7.0 > Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 > *OS:* macOS High Sierra 10.13.6 (17G65) > *Maven:* 3.2.5 > *Java:* > openjdk version "1.8.0_181" > OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) > OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) >Reporter: Gary Yao >Priority: Major > Fix For: 1.8.0 > > Attachments: loop_test.patch > > > Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a > loop may trigger the HTTP server to send a response with {{Content-Type: > application/octet-stream}}, which causes the test to fail. The expected > {{Content-Type}} is {{application/json}}. Note that the REST handler used for > testing, can only return json responses. The failure can likely be triggered > for other tests inside {{RestServerEndpointITCase}} as well. > To run the test in a loop, apply the git patch in the attachment, and execute > the following command: > {code} > mvn clean integration-test -pl flink-runtime -am > -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false > -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir > -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties > {code} > After a while you may see the following stacktrace in the test's log file: > {noformat} > > 15:25:45,619 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase >- > > Test > testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase) > is running. > > 15:25:45,620 WARN > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Upload directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload > does not exist, or has been deleted externally. Previously uploaded files > are no longer available. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Created directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload > for file uploads. > 15:25:45,620 INFO org.apache.flink.runtime.rest.RestClient >- Rest client endpoint started. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Starting rest endpoint. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Rest endpoint listening at localhost:52841 > 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient >- Response was not valid JSON. > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, > \t) is allowed between tokens > at [Source: > org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; > line: 1, column: 2] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(U
[jira] [Updated] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.
[ https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-10585: - Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 *OS:* macOS High Sierra 10.13.6 (17G65) *Maven:* 3.2.5 *Java:* openjdk version "1.8.0_181" OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) was: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 *OS:* macOS High Sierra 10.13.6 (17G65) *Maven:* 3.2.5 *Java:* openjdk version "1.8.0_181" OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) > RestServerEndpoint responds with wrong Content-Type in Integration Test. > > > Key: FLINK-10585 > URL: https://issues.apache.org/jira/browse/FLINK-10585 > Project: Flink > Issue Type: Bug > Components: REST, Tests >Affects Versions: 1.6.1, 1.7.0 > Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 > *OS:* macOS High Sierra 10.13.6 (17G65) > *Maven:* 3.2.5 > *Java:* > openjdk version "1.8.0_181" > OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) > OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) >Reporter: Gary Yao >Priority: Major > Fix For: 1.8.0 > > Attachments: loop_test.patch > > > Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a > loop may trigger the HTTP server to send a response with {{Content-Type: > application/octet-stream}}, which causes the test to fail. The expected > {{Content-Type}} is {{application/json}}. Note that the REST handler used for > testing, can only return json responses. The failure can likely be triggered > for other tests inside {{RestServerEndpointITCase}} as well. > To run the test in a loop, apply the git patch in the attachment, and execute > the following command: > {code} > mvn clean integration-test -pl flink-runtime -am > -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false > -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir > -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties > {code} > After a while you may see the following stacktrace in the test's log file: > {noformat} > > 15:25:45,619 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase >- > > Test > testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase) > is running. > > 15:25:45,620 WARN > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Upload directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload > does not exist, or has been deleted externally. Previously uploaded files > are no longer available. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Created directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload > for file uploads. > 15:25:45,620 INFO org.apache.flink.runtime.rest.RestClient >- Rest client endpoint started. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Starting rest endpoint. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Rest endpoint listening at localhost:52841 > 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient >- Response was not valid JSON. > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, > \t) is allowed between tokens > at [Source: > org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; > line: 1, column: 2] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(U
[jira] [Created] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.
Gary Yao created FLINK-10585: Summary: RestServerEndpoint responds with wrong Content-Type in Integration Test. Key: FLINK-10585 URL: https://issues.apache.org/jira/browse/FLINK-10585 Project: Flink Issue Type: Bug Components: REST, Tests Affects Versions: 1.6.1, 1.7.0 Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 *OS:* macOS High Sierra 10.13.6 (17G65) *Maven:* 3.2.5 *Java:* openjdk version "1.8.0_181" OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) Reporter: Gary Yao Fix For: 1.8.0 Attachments: loop_test.patch Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a loop may trigger the HTTP server to send a response with {{Content-Type: application/octet-stream}}, which causes the test to fail. The expected {{Content-Type}} is {{application/json}}. Note that the REST handler used for testing, can only return json responses. The failure can likely be triggered for other tests inside {{RestServerEndpointITCase}} as well. To run the test in a loop, apply the git patch in the attachment, and execute the following command: {code} mvn clean integration-test -pl flink-runtime -am -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties {code} After a while you may see the following stacktrace in the test's log file: {noformat} 15:25:45,619 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase - Test testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase) is running. 15:25:45,620 WARN org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Upload directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Created directory /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload for file uploads. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Starting rest endpoint. 15:25:45,620 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint - Rest endpoint listening at localhost:52841 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient - Response was not valid JSON. org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; line: 1, column: 2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2891) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:707) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272) at org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:505) at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:453) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.Abs
[jira] [Commented] (FLINK-10578) Support writable state corresponding to queryable state
[ https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653587#comment-16653587 ] Kostas Kloudas commented on FLINK-10578: Thanks a lot [~wind_ljy]! > Support writable state corresponding to queryable state > --- > > Key: FLINK-10578 > URL: https://issues.apache.org/jira/browse/FLINK-10578 > Project: Flink > Issue Type: New Feature > Components: Queryable State, State Backends, Checkpointing >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > I can see that now we support queryable state, why don't we also support the > writable state? I've already done this on our own business to initialize the > state in window during the program's first run. I think other companies may > have other needs to modify the state from outside the job. > The synchronization is a problem but from my perspective, very few objects > will be affected because we can directly operate on the stateTable(for the > FsStateBackend), and about the performance, I think it'll be okay if users > know about it. > I'm not very aware of what will be affected except what I said above, please > let me know more if anyone has doubt about this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10564) tm costs too much time when communicating with jm
[ https://issues.apache.org/jira/browse/FLINK-10564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653584#comment-16653584 ] Till Rohrmann commented on FLINK-10564: --- Thanks for reporting this issue [~chenlf]. It looks as if the JM sometimes does not answer before the {{akka.ask.timeout}} or {{akka.client.timeout}} has been reached. This could be cause by a busy {{JobManager}}. I would try to increase these timeouts and see whether the problem still persists. In general, the Flink community no longer actively supports the version {{< 1.5.x}}. Therefore, I would recommend considering to upgrade to a new Flink version, eventually. > tm costs too much time when communicating with jm > -- > > Key: FLINK-10564 > URL: https://issues.apache.org/jira/browse/FLINK-10564 > Project: Flink > Issue Type: Bug > Components: Core, JobManager, TaskManager > Environment: configs are following: > jm > high-availability zookeeper > taskmanager.heap.mb 16384 > taskmanager.memory.preallocatefalse > taskmanager.numberOfTaskSlots 64 > tm > slots 128 > free slots 0-128 > cpu core 40 > Physical Memory 95gb > free Memory 32gb-50gb > Flink Managed Memory 22gb-35gb >Reporter: chenlf >Priority: Major > Attachments: timeout.log > > > it works fine until the number of tasks is above about 400. > There are 600+ tasks(each task handles billion data) running in our cluster > now,and the problem is it costs too much time (even time out)when > submiting/canceling/querying a task. > Recouses like memory,cpu are on normal level. > after debuging,we found this method is the culprit: > > org.apache.flink.runtime.util.LeaderRetrievalUtils.LeaderGatewayListener.notifyLeaderAddress(String, > UUID) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10578) Support writable state corresponding to queryable state
[ https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liao closed FLINK-10578. -- Resolution: Not A Problem > Support writable state corresponding to queryable state > --- > > Key: FLINK-10578 > URL: https://issues.apache.org/jira/browse/FLINK-10578 > Project: Flink > Issue Type: New Feature > Components: Queryable State, State Backends, Checkpointing >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > I can see that now we support queryable state, why don't we also support the > writable state? I've already done this on our own business to initialize the > state in window during the program's first run. I think other companies may > have other needs to modify the state from outside the job. > The synchronization is a problem but from my perspective, very few objects > will be affected because we can directly operate on the stateTable(for the > FsStateBackend), and about the performance, I think it'll be okay if users > know about it. > I'm not very aware of what will be affected except what I said above, please > let me know more if anyone has doubt about this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10578) Support writable state corresponding to queryable state
[ https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653581#comment-16653581 ] Jiayi Liao commented on FLINK-10578: [~kkl0u] Thanks for pointing out the risks and problems. I've read the proposal before and it's exactly what I need. So at first, I tried to implement the operations on savepoints of FsStateBackend in bravo, but the effort is too big because of the FsStateBackend's internal code structure, and as the mails said, bravo should live as part of the Flink project to make deeper intergration possible in the long run. That's why I give up using bravo and try to solve it in another way. What you said about breaking the fault-tolerace guarantees is really a problem that I didn't think about too much before, I'll close this and maybe reopen it if anyone really needs this in the future. > Support writable state corresponding to queryable state > --- > > Key: FLINK-10578 > URL: https://issues.apache.org/jira/browse/FLINK-10578 > Project: Flink > Issue Type: New Feature > Components: Queryable State, State Backends, Checkpointing >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > I can see that now we support queryable state, why don't we also support the > writable state? I've already done this on our own business to initialize the > state in window during the program's first run. I think other companies may > have other needs to modify the state from outside the job. > The synchronization is a problem but from my perspective, very few objects > will be affected because we can directly operate on the stateTable(for the > FsStateBackend), and about the performance, I think it'll be okay if users > know about it. > I'm not very aware of what will be affected except what I said above, please > let me know more if anyone has doubt about this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10584) Add support for state retention to the Event Time versioned joins.
Kostas Kloudas created FLINK-10584: -- Summary: Add support for state retention to the Event Time versioned joins. Key: FLINK-10584 URL: https://issues.apache.org/jira/browse/FLINK-10584 Project: Flink Issue Type: Sub-task Components: Table API & SQL Affects Versions: 1.7.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.7.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
Kostas Kloudas created FLINK-10583: -- Summary: Add support for state retention to the Processing Time versioned joins. Key: FLINK-10583 URL: https://issues.apache.org/jira/browse/FLINK-10583 Project: Flink Issue Type: Sub-task Components: Table API & SQL Affects Versions: 1.7.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.7.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10578) Support writable state queryable state
[ https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liao updated FLINK-10578: --- Summary: Support writable state queryable state (was: Support writable state in QueryableStateClient) > Support writable state queryable state > --- > > Key: FLINK-10578 > URL: https://issues.apache.org/jira/browse/FLINK-10578 > Project: Flink > Issue Type: New Feature > Components: Queryable State, State Backends, Checkpointing >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > I can see that now we support queryable state, why don't we also support the > writable state? I've already done this on our own business to initialize the > state in window during the program's first run. I think other companies may > have other needs to modify the state from outside the job. > The synchronization is a problem but from my perspective, very few objects > will be affected because we can directly operate on the stateTable(for the > FsStateBackend), and about the performance, I think it'll be okay if users > know about it. > I'm not very aware of what will be affected except what I said above, please > let me know more if anyone has doubt about this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10578) Support writable state corresponding to queryable state
[ https://issues.apache.org/jira/browse/FLINK-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liao updated FLINK-10578: --- Summary: Support writable state corresponding to queryable state (was: Support writable state queryable state) > Support writable state corresponding to queryable state > --- > > Key: FLINK-10578 > URL: https://issues.apache.org/jira/browse/FLINK-10578 > Project: Flink > Issue Type: New Feature > Components: Queryable State, State Backends, Checkpointing >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > I can see that now we support queryable state, why don't we also support the > writable state? I've already done this on our own business to initialize the > state in window during the program's first run. I think other companies may > have other needs to modify the state from outside the job. > The synchronization is a problem but from my perspective, very few objects > will be affected because we can directly operate on the stateTable(for the > FsStateBackend), and about the performance, I think it'll be okay if users > know about it. > I'm not very aware of what will be affected except what I said above, please > let me know more if anyone has doubt about this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6857: [hotfix] [table] Refactor SqlToConverter configuration
fhueske commented on issue #6857: [hotfix] [table] Refactor SqlToConverter configuration URL: https://github.com/apache/flink/pull/6857#issuecomment-430625614 +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10253) Run MetricQueryService with lower priority
[ https://issues.apache.org/jira/browse/FLINK-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653532#comment-16653532 ] ASF GitHub Bot commented on FLINK-10253: asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with lower priority URL: https://github.com/apache/flink/pull/6839 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 5c073f3ad59..420bb7f1da2 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -12,6 +12,11 @@ "0" The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port. + +metrics.internal.query-service.thread-priority +1 +The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down. + metrics.latency.granularity "operator" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 0e7268ee052..0785b347335 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -156,6 +156,18 @@ "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + "Flink will pick a random port."); + /** +* The thread priority for Flink's internal metric query service. The {@code 1} means the min priority and the +* {@code 10} means the max priority. +*/ + public static final ConfigOption QUERY_SERVICE_THREAD_PRIORITY = + key("metrics.internal.query-service.thread-priority") + .defaultValue(1) + .withDescription("The thread priority used for Flink's internal metric query service. The thread is created" + + " by Akka's thread pool executor. " + + "The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " + + "Warning, increasing this value may bring the main Flink components down."); + private MetricOptions() { } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 00b61737d20..430af98bc2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -290,7 +291,8 @@ private static Config getExecutorConfigByExecutorMode(Configuration configuratio case FORK_JOIN_EXECUTOR: return AkkaUtils.getForkJoinExecutorConfig(configuration); case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig(); + return AkkaUtils.getThreadPoolExecutorConfig( + configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY)); default: throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); } diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala new file mode 100644 index 000..d6f6d76ec51 --- /dev/null +++ b/flink-r
[GitHub] asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with lower priority
asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with lower priority URL: https://github.com/apache/flink/pull/6839 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 5c073f3ad59..420bb7f1da2 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -12,6 +12,11 @@ "0" The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port. + +metrics.internal.query-service.thread-priority +1 +The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down. + metrics.latency.granularity "operator" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 0e7268ee052..0785b347335 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -156,6 +156,18 @@ "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + "Flink will pick a random port."); + /** +* The thread priority for Flink's internal metric query service. The {@code 1} means the min priority and the +* {@code 10} means the max priority. +*/ + public static final ConfigOption QUERY_SERVICE_THREAD_PRIORITY = + key("metrics.internal.query-service.thread-priority") + .defaultValue(1) + .withDescription("The thread priority used for Flink's internal metric query service. The thread is created" + + " by Akka's thread pool executor. " + + "The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " + + "Warning, increasing this value may bring the main Flink components down."); + private MetricOptions() { } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 00b61737d20..430af98bc2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -290,7 +291,8 @@ private static Config getExecutorConfigByExecutorMode(Configuration configuratio case FORK_JOIN_EXECUTOR: return AkkaUtils.getForkJoinExecutorConfig(configuration); case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig(); + return AkkaUtils.getThreadPoolExecutorConfig( + configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY)); default: throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); } diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala new file mode 100644 index 000..d6f6d76ec51 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional i
[jira] [Closed] (FLINK-10253) Run MetricQueryService with lower priority
[ https://issues.apache.org/jira/browse/FLINK-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10253. - Resolution: Fixed Fix Version/s: (was: 1.5.6) (was: 1.6.3) Fixed in 1.7.0 via: f81297ac224f45d750ee0616755ade809b57ea4a 39324331979cdedb9832bf06408bb182fc9476fa > Run MetricQueryService with lower priority > -- > > Key: FLINK-10253 > URL: https://issues.apache.org/jira/browse/FLINK-10253 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > We should run the {{MetricQueryService}} with a lower priority than the main > Flink components. An idea would be to start the underlying threads with a > lower priority. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's thread priority configurable
tillrohrmann opened a new pull request #6870: [BP-1.5][FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6870 Backport of #6868 to `release-1.5`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann opened a new pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's thread priority configurable
tillrohrmann opened a new pull request #6869: [BP-1.6][FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6869 Backport of #6868 to `release-1.6`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10582) Make REST executor thread priority configurable
[ https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653515#comment-16653515 ] ASF GitHub Bot commented on FLINK-10582: tillrohrmann opened a new pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6868 ## What is the purpose of the change Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority") to configure the thread priority of the REST executor's threads. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make REST executor thread priority configurable > --- > > Key: FLINK-10582 > URL: https://issues.apache.org/jira/browse/FLINK-10582 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-10282, we introduced a dedicated thread pool for the REST server > endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, > however, might affect existing users by making some of the REST calls no > longer responsive (e.g. if the other components' threads take all the time). > Therefore, I propose to set the default thread priority to > {{Thread.NORM_PRIORITY}} and make it additionally configurable such that > users can change it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10582) Make REST executor thread priority configurable
[ https://issues.apache.org/jira/browse/FLINK-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10582: --- Labels: pull-request-available (was: ) > Make REST executor thread priority configurable > --- > > Key: FLINK-10582 > URL: https://issues.apache.org/jira/browse/FLINK-10582 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-10282, we introduced a dedicated thread pool for the REST server > endpoint. The thread priority was set to {{Thread.MIN_PRIORITY}}. This, > however, might affect existing users by making some of the REST calls no > longer responsive (e.g. if the other components' threads take all the time). > Therefore, I propose to set the default thread priority to > {{Thread.NORM_PRIORITY}} and make it additionally configurable such that > users can change it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable
tillrohrmann opened a new pull request #6868: [FLINK-10582] Make REST executor's thread priority configurable URL: https://github.com/apache/flink/pull/6868 ## What is the purpose of the change Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority") to configure the thread priority of the REST executor's threads. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10537) Network throughput performance regression after broadcast changes
[ https://issues.apache.org/jira/browse/FLINK-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653494#comment-16653494 ] ASF GitHub Bot commented on FLINK-10537: pnowojski closed pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913] URL: https://github.com/apache/flink/pull/6833 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java index 7738a3091e1..01feae03380 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java @@ -108,6 +108,7 @@ public int length() { } public void pruneBuffer() { + clear(); if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { if (LOG.isDebugEnabled()) { LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 6eebbbe88eb..c0cf35d9576 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -82,9 +82,9 @@ public boolean isFullBuffer() { SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder); /** -* Checks to decrease the size of intermediate data serialization buffer after finishing the -* whole serialization process including {@link #serializeRecord(IOReadableWritable)} and -* {@link #copyToBufferBuilder(BufferBuilder)}. +* Clears the buffer and checks to decrease the size of intermediate data serialization buffer +* after finishing the whole serialization process including +* {@link #serializeRecord(IOReadableWritable)} and {@link #copyToBufferBuilder(BufferBuilder)}. */ void prune(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index ba2ed0133fd..f0666791ec2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -92,20 +92,9 @@ public void serializeRecord(T record) throws IOException { */ @Override public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) { - boolean mustCommit = false; - if (lengthBuffer.hasRemaining()) { - targetBuffer.append(lengthBuffer); - mustCommit = true; - } - - if (dataBuffer.hasRemaining()) { - targetBuffer.append(dataBuffer); - mustCommit = true; - } - - if (mustCommit) { - targetBuffer.commit(); - } + targetBuffer.append(lengthBuffer); + targetBuffer.append(dataBuffer); + targetBuffer.commit(); return getSerializationResult(targetBuffer); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Network throughput performance regression after broadcast changes > - > > Key: FLINK-10537 > URL: https://issues.apache.org/jira/browse/FLINK-10537 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.7.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > There is a slight network throughput regression introduced in: > https://issues.apache.org/jira/b
[GitHub] pnowojski closed pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913]
pnowojski closed pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913] URL: https://github.com/apache/flink/pull/6833 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java index 7738a3091e1..01feae03380 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java @@ -108,6 +108,7 @@ public int length() { } public void pruneBuffer() { + clear(); if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { if (LOG.isDebugEnabled()) { LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 6eebbbe88eb..c0cf35d9576 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -82,9 +82,9 @@ public boolean isFullBuffer() { SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder); /** -* Checks to decrease the size of intermediate data serialization buffer after finishing the -* whole serialization process including {@link #serializeRecord(IOReadableWritable)} and -* {@link #copyToBufferBuilder(BufferBuilder)}. +* Clears the buffer and checks to decrease the size of intermediate data serialization buffer +* after finishing the whole serialization process including +* {@link #serializeRecord(IOReadableWritable)} and {@link #copyToBufferBuilder(BufferBuilder)}. */ void prune(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index ba2ed0133fd..f0666791ec2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -92,20 +92,9 @@ public void serializeRecord(T record) throws IOException { */ @Override public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) { - boolean mustCommit = false; - if (lengthBuffer.hasRemaining()) { - targetBuffer.append(lengthBuffer); - mustCommit = true; - } - - if (dataBuffer.hasRemaining()) { - targetBuffer.append(dataBuffer); - mustCommit = true; - } - - if (mustCommit) { - targetBuffer.commit(); - } + targetBuffer.append(lengthBuffer); + targetBuffer.append(dataBuffer); + targetBuffer.commit(); return getSerializationResult(targetBuffer); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services