[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-35088:
-------------------------------

    Assignee: elon_X

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-35088
>                 URL: https://issues.apache.org/jira/browse/FLINK-35088
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core, Runtime / Coordination
>    Affects Versions: 1.16.1
>            Reporter: elon_X
>            Assignee: elon_X
>            Priority: Major
>         Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) 
> ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>  [?:1.8.0_351]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_351]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_351]
>       at java.lang.Thread.run(Thread.java:750) [?:1.8.0_351]{code}
> Therefore, I believe it's necessary to validate these two parameters to 
> ensure that exceptions are thrown on the client side to alert the user.
> !image-2024-04-11-20-12-29-951.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to