[jira] [Created] (FLINK-33855) send flag message like checkpoint barrier
Spongebob created FLINK-33855: - Summary: send flag message like checkpoint barrier Key: FLINK-33855 URL: https://issues.apache.org/jira/browse/FLINK-33855 Project: Flink Issue Type: Technical Debt Components: Table SQL / API Reporter: Spongebob I use flinksql to consume kafka message. Now I want to send one specific message to that kafka topic, and I hope the sink table that use user defined connector can receive this specific message. maybe it sounds like the barrier to trigger checkpoints. Is there anyway to solve this debt ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32970) could not load external class using "-C" option
[ https://issues.apache.org/jira/browse/FLINK-32970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-32970. - Resolution: Won't Fix > could not load external class using "-C" option > --- > > Key: FLINK-32970 > URL: https://issues.apache.org/jira/browse/FLINK-32970 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.6 >Reporter: Spongebob >Priority: Major > > Firstly, the "connectors.jar" contains "test-connector" and I put it in user > libs. > Then, I started a tableEvironment in one operator function of > streamExecutionEnvironment. > In the tableEvironment I declared a table using the "test-connector". > Finally, I run the application and load the "connectors.jar" using "-C > connectors.jar", when the table's creation statement was executed, I got an > class not found exception which like below(please notice that if I put the > "connectors.jar" in flink lib, the application would run normally): > {code:java} > SLF4J: Found binding in > [jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: > Found binding in > [jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: > See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation.SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129) > at > org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92) >at > com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58) >at > com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60) > at > com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103) >at > com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at >
[jira] [Commented] (FLINK-32970) could not load external class using "-C" option
[ https://issues.apache.org/jira/browse/FLINK-32970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760800#comment-17760800 ] Spongebob commented on FLINK-32970: --- Hi [~martijnvisser] , I think I should not create another Environment in operation functions. Because that would lauch a local Environment. > could not load external class using "-C" option > --- > > Key: FLINK-32970 > URL: https://issues.apache.org/jira/browse/FLINK-32970 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.6 >Reporter: Spongebob >Priority: Major > > Firstly, the "connectors.jar" contains "test-connector" and I put it in user > libs. > Then, I started a tableEvironment in one operator function of > streamExecutionEnvironment. > In the tableEvironment I declared a table using the "test-connector". > Finally, I run the application and load the "connectors.jar" using "-C > connectors.jar", when the table's creation statement was executed, I got an > class not found exception which like below(please notice that if I put the > "connectors.jar" in flink lib, the application would run normally): > {code:java} > SLF4J: Found binding in > [jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: > Found binding in > [jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: > See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation.SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129) > at > org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92) >at > com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58) >at > com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60) > at > com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103) >at > com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) >at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at
[jira] [Updated] (FLINK-32970) could not load external class using "-C" option
[ https://issues.apache.org/jira/browse/FLINK-32970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32970: -- Description: Firstly, the "connectors.jar" contains "test-connector" and I put it in user libs. Then, I started a tableEvironment in one operator function of streamExecutionEnvironment. In the tableEvironment I declared a table using the "test-connector". Finally, I run the application and load the "connectors.jar" using "-C connectors.jar", when the table's creation statement was executed, I got an class not found exception which like below(please notice that if I put the "connectors.jar" in flink lib, the application would run normally): {code:java} SLF4J: Found binding in [jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129) at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92) at com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58) at com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60) at com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103) at com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at
[jira] [Created] (FLINK-32970) could not load external class using "-C" option
Spongebob created FLINK-32970: - Summary: could not load external class using "-C" option Key: FLINK-32970 URL: https://issues.apache.org/jira/browse/FLINK-32970 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.6 Reporter: Spongebob Firstly, the "connectors.jar" contains "test-connector" and I put it in user libs. Then, I started a tableEvironment in one operator function of streamExecutionEnvironment. In the tableEvironment I declared a table using the "test-connector". Finally, when the table's creation statement was executed, I got an class not found exception which like below(please notice that if I put the "connectors.jar" in flink lib, the application would run normally): {code:java} SLF4J: Found binding in [jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129) at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92) at com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58) at com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60) at com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103) at com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at
[jira] [Closed] (FLINK-32361) error after replace dependent jar file
[ https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-32361. - Resolution: Cannot Reproduce > error after replace dependent jar file > -- > > Key: FLINK-32361 > URL: https://issues.apache.org/jira/browse/FLINK-32361 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > > in the standalone session mode. I have one dependent jar file named 'A.jar' > in the folder `lib1`, so I submit my app via command `flink run -C > file:///lib1/A.jar -c Application ./myApp.jar`. well it runs normally. > And, I have the same jar file named 'A.jar' in the folder `lib2` also which > was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from > `lib2` to `lib1`, re-submit the application. Finally I would get an > ClassNotFoundException which class refer to A.jar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-32739. - Resolution: Not A Problem > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > at >
[jira] [Commented] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750937#comment-17750937 ] Spongebob commented on FLINK-32739: --- Thanks [~JunRuiLi] , this option helps. When one of regions restart, it will be restarted in the original job or new job ? > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at >
[jira] [Comment Edited] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750648#comment-17750648 ] Spongebob edited comment on FLINK-32739 at 8/3/23 9:44 AM: --- [~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack was in the issue description. And I found some relative logs in the other taskmanger: {code:java} 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 109179dd9cefa1f4b109b8f726d0e0f7 for job 15ffcdad572bd98fd74ffd45848e84cb from resource manager with leader id . 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 109179dd9cefa1f4b109b8f726d0e0f7. 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 15ffcdad572bd98fd74ffd45848e84cb for job leader monitoring. 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@192.168.0.11:6123/user/rpc/jobmanager_3 with leader id ----. 2023-08-03 17:21:41,621 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2023-08-03 17:21:41,625 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@192.168.0.11:6123/user/rpc/jobmanager_3 for job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,625 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,626 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,627 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=256.000mb (268435451 bytes), taskOffHeapMemory=0 bytes, managedMemory=245.760mb (257698041 bytes), networkMemory=61.440mb (64424510 bytes)}, allocationId: 109179dd9cefa1f4b109b8f726d0e0f7, jobId: 15ffcdad572bd98fd74ffd45848e84cb). 2023-08-03 17:21:41,629 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 15ffcdad572bd98fd74ffd45848e84cb from job leader monitoring. 2023-08-03 17:21:41,629 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb.{code} was (Author: spongebobz): [~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack was in the issue description. And I found some relative logs in the other taskmanger: {code:java} 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 109179dd9cefa1f4b109b8f726d0e0f7 for job 15ffcdad572bd98fd74ffd45848e84cb from resource manager with leader id . 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 109179dd9cefa1f4b109b8f726d0e0f7. 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 15ffcdad572bd98fd74ffd45848e84cb for job leader monitoring. 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 with leader id ----. 2023-08-03 17:21:41,621 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2023-08-03 17:21:41,625 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 for job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,625 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,626 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,627 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=256.000mb (268435451 bytes), taskOffHeapMemory=0 bytes,
[jira] [Comment Edited] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750648#comment-17750648 ] Spongebob edited comment on FLINK-32739 at 8/3/23 9:44 AM: --- [~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack was in the issue description. And I found some relative logs in the other taskmanger: {code:java} 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 109179dd9cefa1f4b109b8f726d0e0f7 for job 15ffcdad572bd98fd74ffd45848e84cb from resource manager with leader id . 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 109179dd9cefa1f4b109b8f726d0e0f7. 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 15ffcdad572bd98fd74ffd45848e84cb for job leader monitoring. 2023-08-03 17:21:41,618 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 with leader id ----. 2023-08-03 17:21:41,621 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2023-08-03 17:21:41,625 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 for job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,625 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,626 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 15ffcdad572bd98fd74ffd45848e84cb. 2023-08-03 17:21:41,627 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=256.000mb (268435451 bytes), taskOffHeapMemory=0 bytes, managedMemory=245.760mb (257698041 bytes), networkMemory=61.440mb (64424510 bytes)}, allocationId: 109179dd9cefa1f4b109b8f726d0e0f7, jobId: 15ffcdad572bd98fd74ffd45848e84cb). 2023-08-03 17:21:41,629 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 15ffcdad572bd98fd74ffd45848e84cb from job leader monitoring. 2023-08-03 17:21:41,629 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb. {code} was (Author: spongebobz): [~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack was in the issue description. > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at >
[jira] [Commented] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750648#comment-17750648 ] Spongebob commented on FLINK-32739: --- [~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack was in the issue description. > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at >
[jira] [Updated] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32739: -- Attachment: jobmanager.log > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > at >
[jira] [Updated] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32739: -- Description: In my flink standalone cluster, I config 2 taskmanagers. Then I test with following steps: # submit streaming job which was configed to fixed restart strategy to flink session environment # this job was running on taskmanager1. Then I killed the taskmanager1. # this job turned to be failed after restarting attemps. this job could not be transported to taskmanager2 which had enough slots as expected. Here's the exception trace: {code:java} 2023-08-03 15:13:56 org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=1) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) at
[jira] [Updated] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32739: -- Description: In my flink standalone cluster, I config 2 taskmanagers. Then I test with following steps: # submit streaming job which was configed to fixed restart strategy to flink session environment # this job was running on taskmanager1. Then I killed the taskmanager1. # this job turned to be failed after restarting attemps. this job could not be transported to taskmanager2 as expected. Here's the exception trace: {code:java} 2023-08-03 15:13:56 org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=1) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) at
[jira] [Created] (FLINK-32739) restarting not work
Spongebob created FLINK-32739: - Summary: restarting not work Key: FLINK-32739 URL: https://issues.apache.org/jira/browse/FLINK-32739 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.5 Reporter: Spongebob In my flink standalone cluster, I config 2 taskmanagers. Then I test with following steps: # submit streaming job which was configed to fixed restart strategy to flink session environment # this job was running on taskmanager1. Then I killed the taskmanager1. # this job turned to be failed after restarting attemps. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-24837) submit flink job failed via restapi
[ https://issues.apache.org/jira/browse/FLINK-24837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-24837. - Resolution: Won't Fix > submit flink job failed via restapi > --- > > Key: FLINK-24837 > URL: https://issues.apache.org/jira/browse/FLINK-24837 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.12.4 >Reporter: Spongebob >Priority: Major > > I tried to submit flink job via flink restapi but got exception, I used > `await` function in my job so that it would submit multiple jobs. Below is > the exception detail. > {code:java} > // > "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not > execute > application.\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)\n\tatjava.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tatjava.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tatjava.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)\n\tatjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tatjava.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tatjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tatjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tatjava.lang.Thread.run(Thread.java:748)\nCaused > by: > java.util.concurrent.CompletionException:org.apache.flink.util.FlinkRuntimeException: > Could not execute > application.\n\tatjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tatjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)\n\t... > 7 more\nCaused by:org.apache.flink.util.FlinkRuntimeException: Could not > execute > application.\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t... > 7 more\nCaused > by:org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error:org.apache.flink.table.api.TableException: Failed to > wait job > finish\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)\n\tatorg.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)\n\tatorg.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\t...10 > more\nCaused by: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to waitjob finish\n\tat > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tatjava.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)\n\tatcom.xctech.cone.flink.migrate.batch.BatchCone.main(BatchCone.java:238)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tatsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tatjava.lang.reflect.Method.invoke(Method.java:498)\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)\n\t... > 13 more\nCaused by:org.apache.flink.table.api.TableException: Failed to wait > job >
[jira] [Reopened] (FLINK-32361) error after replace dependent jar file
[ https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob reopened FLINK-32361: --- Hi [~Adhip] yes I think this is a bug so I report this issue here. > error after replace dependent jar file > -- > > Key: FLINK-32361 > URL: https://issues.apache.org/jira/browse/FLINK-32361 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > > in the standalone session mode. I have one dependent jar file named 'A.jar' > in the folder `lib1`, so I submit my app via command `flink run -C > file:///lib1/A.jar -c Application ./myApp.jar`. well it runs normally. > And, I have the same jar file named 'A.jar' in the folder `lib2` also which > was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from > `lib2` to `lib1`, re-submit the application. Finally I would get an > ClassNotFoundException which class refer to A.jar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32361) error after replace dependent jar file
[ https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733980#comment-17733980 ] Spongebob edited comment on FLINK-32361 at 6/19/23 1:57 AM: Hi [~mapohl] yes I think this is a bug so I report this issue here. was (Author: spongebobz): Hi [~Adhip] yes I think this is a bug so I report this issue here. > error after replace dependent jar file > -- > > Key: FLINK-32361 > URL: https://issues.apache.org/jira/browse/FLINK-32361 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > > in the standalone session mode. I have one dependent jar file named 'A.jar' > in the folder `lib1`, so I submit my app via command `flink run -C > file:///lib1/A.jar -c Application ./myApp.jar`. well it runs normally. > And, I have the same jar file named 'A.jar' in the folder `lib2` also which > was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from > `lib2` to `lib1`, re-submit the application. Finally I would get an > ClassNotFoundException which class refer to A.jar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32361) error after replace dependent jar file
Spongebob created FLINK-32361: - Summary: error after replace dependent jar file Key: FLINK-32361 URL: https://issues.apache.org/jira/browse/FLINK-32361 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.5 Reporter: Spongebob in the standalone session mode. I have one dependent jar file named 'A.jar' in the folder `lib1`, so I submit my app via command `flink run -C file:///lib1/A.jar -c Application ./myApp.jar`. well it runs normally. And, I have the same jar file named 'A.jar' in the folder `lib2` also which was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from `lib2` to `lib1`, re-submit the application. Finally I would get an ClassNotFoundException which class refer to A.jar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32308) RestClusterClient submit job to remote cluster
[ https://issues.apache.org/jira/browse/FLINK-32308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32308: -- Affects Version/s: 1.14.5 > RestClusterClient submit job to remote cluster > -- > > Key: FLINK-32308 > URL: https://issues.apache.org/jira/browse/FLINK-32308 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Not a Priority > > I just used `RestClusterClient` submit job to remote cluster, but out of my > expectation it submitted to local cluster instead. Could you help with me > {code:java} > String host = "x.x.x.x"; > int port = 8081; > Configuration flinkConfiguration = new Configuration(); > flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); > flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123); > flinkConfiguration.setInteger(RestOptions.PORT, port); > RestClusterClient clusterClient = new > RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance()); > String s = clusterClient.getWebInterfaceURL(); > List extraJars = new ArrayList<>(); > extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL()); > PackagedProgram packagedProgram = PackagedProgram.newBuilder() > .setConfiguration(flinkConfiguration) > .setJarFile(new File("F:\\data.jar")) > .setEntryPointClassName("MyApplication") > .setUserClassPaths(extraJars) > .build(); > JobID jobID = JobID.generate(); > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, > flinkConfiguration, 2, jobID, false); > clusterClient.submitJob(jobGraph); > System.out.println(jobID); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32308) RestClusterClient submit job to remote cluster
Spongebob created FLINK-32308: - Summary: RestClusterClient submit job to remote cluster Key: FLINK-32308 URL: https://issues.apache.org/jira/browse/FLINK-32308 Project: Flink Issue Type: Bug Components: Runtime / REST Reporter: Spongebob I just used `RestClusterClient` submit job to remote cluster, but out of my expectation it submitted to local cluster instead. Could you help with me {code:java} String host = "x.x.x.x"; int port = 8081; Configuration flinkConfiguration = new Configuration(); flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123); flinkConfiguration.setInteger(RestOptions.PORT, port); RestClusterClient clusterClient = new RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance()); String s = clusterClient.getWebInterfaceURL(); List extraJars = new ArrayList<>(); extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL()); PackagedProgram packagedProgram = PackagedProgram.newBuilder() .setConfiguration(flinkConfiguration) .setJarFile(new File("F:\\data.jar")) .setEntryPointClassName("MyApplication") .setUserClassPaths(extraJars) .build(); JobID jobID = JobID.generate(); JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, flinkConfiguration, 2, jobID, false); clusterClient.submitJob(jobGraph); System.out.println(jobID); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32274) kafka class could not be found while using appliction mode.
[ https://issues.apache.org/jira/browse/FLINK-32274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-32274. - Release Note: try to replace `org.apache.kafka.common.serialization.StringSerializer` with `Class.forName("org.apache.kafka.common.serialization.StringSerializer")` Resolution: Invalid > kafka class could not be found while using appliction mode. > --- > > Key: FLINK-32274 > URL: https://issues.apache.org/jira/browse/FLINK-32274 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > > While using yarn-application mode to submit my app, it would throw an > exception: `org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.StringDeserializer for configuration > key.deserializer: Class > org.apache.kafka.common.serialization.StringDeserializer could not be found.` > I had defined KafkaProducer variation in my client code. And I think the > kafka-clients dependency was already uploaded while submitting the app. > But I found it is ok in per-job mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32274) kafka class could not be found while using appliction mode.
Spongebob created FLINK-32274: - Summary: kafka class could not be found while using appliction mode. Key: FLINK-32274 URL: https://issues.apache.org/jira/browse/FLINK-32274 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.14.5 Reporter: Spongebob While using yarn-application mode to submit my app, it would throw an exception: `org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.StringDeserializer could not be found.` I had defined KafkaProducer variation in my client code. And I think the kafka-clients dependency was already uploaded while submitting the app. But I found it is ok in per-job mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722121#comment-17722121 ] Spongebob edited comment on FLINK-32065 at 5/12/23 11:15 AM: - Hi [~Thesharing] Does it likelihood due to the incorrect starting of the standalone cluster? Such as secondary starting the cluster before it is stopped. was (Author: spongebobz): Hi [~Thesharing] Does it likelihood due to the incorrect starting of the standalone cluster? Such as secondary starting the cluster before stopping it. > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722121#comment-17722121 ] Spongebob commented on FLINK-32065: --- Hi [~Thesharing] Does it likelihood due to the incorrect starting of the standalone cluster? Such as secondary starting the cluster before stopping it. > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722094#comment-17722094 ] Spongebob commented on FLINK-32065: --- Hi, [~Thesharing] , may be I could provide these logs for you. !image-2023-05-12-17-37-09-002.png! > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32065: -- Attachment: image-2023-05-12-17-37-09-002.png > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32065: -- Description: When I submit an application to flink standalone cluster, I got a NoSuchFileException. I think it was failed to create the tmp channel file but I am confused about the reason relative to this case. I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so is this diretory only working for that step of the application ? BTW, this issue happen coincidently. !image-2023-05-12-14-07-45-771.png! was: When I submit an application to flink standalone cluster, I got a NoSuchFileException. I think it was failed to create the tmp channel file but I am confused about the reason relative to this case. BTW, this issue happen coincidently. !image-2023-05-12-14-07-45-771.png! > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32065: -- Attachment: image-2023-05-12-14-26-46-268.png > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. BTW, this issue happen > coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32065) Got NoSuchFileException when initialize source function.
Spongebob created FLINK-32065: - Summary: Got NoSuchFileException when initialize source function. Key: FLINK-32065 URL: https://issues.apache.org/jira/browse/FLINK-32065 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.4 Reporter: Spongebob Attachments: image-2023-05-12-14-07-45-771.png When I submit an application to flink standalone cluster, I got a NoSuchFileException. I think it was failed to create the tmp channel file but I am confused about the reason relative to this case. BTW, this issue happen coincidently. !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30500) Got an null pointer exception when using table environment in sub-thread
[ https://issues.apache.org/jira/browse/FLINK-30500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-30500: -- Description: I can use the `addInsertSql` function nornally in norm situation, but when I create the table env in main thread and then run this function in sub-thread, I got an ambigious exception that was thrown by calcite. {code:java} public static void main(String[] args) throws InterruptedException { TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); String ddl = "CREATE TABLE IF NOT EXISTS SOURCE(" + "A DECIMAL(19)," + "B DECIMAL(19)" + ") WITH (" + "'connector' = 'jdbc'...)"; String ddl2 = "CREATE TABLE IF NOT EXISTS SINK(" + "A DECIMAL(19)," + "B DECIMAL(19)" + ") WITH (" + "'connector' = 'print')"; tableEnvironment.executeSql(ddl); tableEnvironment.executeSql(ddl2); StatementSet statementSet = tableEnvironment.createStatementSet(); CompletableFuture.runAsync(() -> { statementSet.addInsertSql("INSERT INTO SINK SELECT * FROM SOURCE"); try { statementSet.execute().await(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }).exceptionally((e) -> { e.printStackTrace(); return null; }); Thread.sleep(1); } {code} {code:java} Caused by: java.lang.NullPointerException at java.util.Objects.requireNonNull(Objects.java:203) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:78) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108) at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344) at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at
[jira] [Created] (FLINK-30500) Got an null pointer exception when using table environment in sub-thread
Spongebob created FLINK-30500: - Summary: Got an null pointer exception when using table environment in sub-thread Key: FLINK-30500 URL: https://issues.apache.org/jira/browse/FLINK-30500 Project: Flink Issue Type: Bug Affects Versions: 1.14.3 Reporter: Spongebob I can use the `addInsertSql` function nornally in norm situation, but when I create the table env in main thread and then run this function in sub-thread, I got an ambigious exception that was thrown by calcite. {code:java} Caused by: java.lang.NullPointerException at java.util.Objects.requireNonNull(Objects.java:203) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:78) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108) at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344) at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290) at
[jira] [Commented] (FLINK-29428) got an ambigious exception in flinksql
[ https://issues.apache.org/jira/browse/FLINK-29428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17610368#comment-17610368 ] Spongebob commented on FLINK-29428: --- when I disable the multiple input operator, it turned to be normal > got an ambigious exception in flinksql > -- > > Key: FLINK-29428 > URL: https://issues.apache.org/jira/browse/FLINK-29428 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > when I execute my lengthy flinksql that contains 5 joins, I got this > exception: > I have no any idea to solve this issue and asking for your help. > {code:java} > Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of > recursions, without reducing partitions enough to be memory resident. > Probably cause: Too many duplicate keys. > at > org.apache.flink.table.runtime.hashtable.BinaryHashTable.buildTableFromSpilledPartition(BinaryHashTable.java:443) > at > org.apache.flink.table.runtime.hashtable.BinaryHashTable.prepareNextPartition(BinaryHashTable.java:403) > at > org.apache.flink.table.runtime.hashtable.BinaryHashTable.nextMatching(BinaryHashTable.java:265) > at > org.apache.flink.table.runtime.operators.join.HashJoinOperator.endInput(HashJoinOperator.java:176) > at > org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.endOperatorInput(TableOperatorWrapper.java:124) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.endInput(BatchMultipleInputStreamOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:93) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:100) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:748) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29428) got an ambigious exception in flinksql
Spongebob created FLINK-29428: - Summary: got an ambigious exception in flinksql Key: FLINK-29428 URL: https://issues.apache.org/jira/browse/FLINK-29428 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob when I execute my lengthy flinksql that contains 5 joins, I got this exception: I have no any idea to solve this issue and asking for your help. {code:java} Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys. at org.apache.flink.table.runtime.hashtable.BinaryHashTable.buildTableFromSpilledPartition(BinaryHashTable.java:443) at org.apache.flink.table.runtime.hashtable.BinaryHashTable.prepareNextPartition(BinaryHashTable.java:403) at org.apache.flink.table.runtime.hashtable.BinaryHashTable.nextMatching(BinaryHashTable.java:265) at org.apache.flink.table.runtime.operators.join.HashJoinOperator.endInput(HashJoinOperator.java:176) at org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.endOperatorInput(TableOperatorWrapper.java:124) at org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.endInput(BatchMultipleInputStreamOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:93) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:100) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert
[ https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob resolved FLINK-29216. --- Resolution: Not A Problem > rows belong to update rowkind are transfered into delete and insert > --- > > Key: FLINK-29216 > URL: https://issues.apache.org/jira/browse/FLINK-29216 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > When I declared primary key on the sink table, all UPDATE rows were > transfered into DELETE and INSERT rowkind, despite the primay key values were > not changed at all. If this is the expected logic then how could I change > this behavior ? > > following is the test code: > {code:java} > CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH > ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json') > CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY > KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print') > insert into SINK select * from SOURCE > when I produced these message to kafka: > {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}} > {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}} > flink output these rows: > +I[BOB, MATH, 10] > -D[BOB, MATH, 10] > +I[BOB, MATH, 16]{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert
[ https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601167#comment-17601167 ] Spongebob commented on FLINK-29216: --- Thanks [~lincoln.86xy] , it apparents normal when I set this option to NONE, since this option is used to solve the disorder in distribute system. But if I run my application in single parallelism would it be ok ? > rows belong to update rowkind are transfered into delete and insert > --- > > Key: FLINK-29216 > URL: https://issues.apache.org/jira/browse/FLINK-29216 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > When I declared primary key on the sink table, all UPDATE rows were > transfered into DELETE and INSERT rowkind, despite the primay key values were > not changed at all. If this is the expected logic then how could I change > this behavior ? > > following is the test code: > {code:java} > CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH > ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json') > CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY > KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print') > insert into SINK select * from SOURCE > when I produced these message to kafka: > {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}} > {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}} > flink output these rows: > +I[BOB, MATH, 10] > -D[BOB, MATH, 10] > +I[BOB, MATH, 16]{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert
[ https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-29216: -- Description: When I declared primary key on the sink table, all UPDATE rows were transfered into DELETE and INSERT rowkind, despite the primay key values were not changed at all. If this is the expected logic then how could I change this behavior ? following is the test code: {code:java} CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json') CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print') insert into SINK select * from SOURCE when I produced these message to kafka: {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}} {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}} flink output these rows: +I[BOB, MATH, 10] -D[BOB, MATH, 10] +I[BOB, MATH, 16]{code} was:When I declared primary key on the sink table, all UPDATE rows were transfered into DELETE and INSERT rowkind, despite the primay key values were not changed at all. If this is the expected logic then how could I change this behavior ? > rows belong to update rowkind are transfered into delete and insert > --- > > Key: FLINK-29216 > URL: https://issues.apache.org/jira/browse/FLINK-29216 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > When I declared primary key on the sink table, all UPDATE rows were > transfered into DELETE and INSERT rowkind, despite the primay key values were > not changed at all. If this is the expected logic then how could I change > this behavior ? > > following is the test code: > {code:java} > CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH > ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json') > CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY > KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print') > insert into SINK select * from SOURCE > when I produced these message to kafka: > {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}} > {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}} > flink output these rows: > +I[BOB, MATH, 10] > -D[BOB, MATH, 10] > +I[BOB, MATH, 16]{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert
Spongebob created FLINK-29216: - Summary: rows belong to update rowkind are transfered into delete and insert Key: FLINK-29216 URL: https://issues.apache.org/jira/browse/FLINK-29216 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob When I declared primary key on the sink table, all UPDATE rows were transfered into DELETE and INSERT rowkind, despite the primay key values were not changed at all. If this is the expected logic then how could I change this behavior ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27969) StreamPhysicalOverAggregate doesn't support consuming update and delete changes
Spongebob created FLINK-27969: - Summary: StreamPhysicalOverAggregate doesn't support consuming update and delete changes Key: FLINK-27969 URL: https://issues.apache.org/jira/browse/FLINK-27969 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob Exception trace: {code:java} // exception StreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(COL2 = COL4)], select=[...], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) {code} FlinkSQL that scheduled as streaming table like this: {code:java} // dml SELECT RANK() OVER (PARTITION BY A.COL1 ORDER BY A.COL2) AS ODER_ONUM FROM A INNER JOIN B ON A.COL1 = B.COL1 LEFT JOIN C ON C.COL3 = 1 AND CAST(A.COL2 AS STRING) = C.COL4{code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Reopened] (FLINK-27638) failed to join with table function
[ https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob reopened FLINK-27638: --- > failed to join with table function > -- > > Key: FLINK-27638 > URL: https://issues.apache.org/jira/browse/FLINK-27638 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-16-19-40-23-179.png > > > # register two table function named `FUNC_A` and `FUNC_B` > # left join with FUNC_A > # inner join with FUNC_B > # schedule the dml > after these steps, I found the task of FUNC_A keeped running but the task of > FUNC_B turned to be finished in serveral seconds. And I am not sure that the > unnormal task lead to empty output of the dml. > !image-2022-05-16-19-40-23-179.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27638) failed to join with table function
[ https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540880#comment-17540880 ] Spongebob commented on FLINK-27638: --- [~fsk119] I had not set any finish condition in the table function. By the way it run normally when I don't join with any other table function in the graph. > failed to join with table function > -- > > Key: FLINK-27638 > URL: https://issues.apache.org/jira/browse/FLINK-27638 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-16-19-40-23-179.png > > > # register two table function named `FUNC_A` and `FUNC_B` > # left join with FUNC_A > # inner join with FUNC_B > # schedule the dml > after these steps, I found the task of FUNC_A keeped running but the task of > FUNC_B turned to be finished in serveral seconds. And I am not sure that the > unnormal task lead to empty output of the dml. > !image-2022-05-16-19-40-23-179.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27638) failed to join with table function
[ https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27638: -- Attachment: (was: image-2022-05-16-19-08-25-477.png) > failed to join with table function > -- > > Key: FLINK-27638 > URL: https://issues.apache.org/jira/browse/FLINK-27638 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-16-19-40-23-179.png > > > # register two table function named `FUNC_A` and `FUNC_B` > # left join with FUNC_A > # inner join with FUNC_B > # schedule the dml > after these steps, I found the task of FUNC_A keeped running but the task of > FUNC_B turned to be finished in serveral seconds. And I am not sure that the > unnormal task lead to empty output of the dml. > !image-2022-05-16-19-40-23-179.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27638) failed to join with table function
[ https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27638: -- Attachment: image-2022-05-16-19-40-23-179.png Description: # register two table function named `FUNC_A` and `FUNC_B` # left join with FUNC_A # inner join with FUNC_B # schedule the dml after these steps, I found the task of FUNC_A keeped running but the task of FUNC_B turned to be finished in serveral seconds. And I am not sure that the unnormal task lead to empty output of the dml. !image-2022-05-16-19-40-23-179.png! was: # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER` # create two complexible DML that both inner join with the table functioin and they are all streaming tasks. # schedule the two DML in one submission based on statementSet. atfer these steps I found that the table function was run on one exclusive task and it turned to be finished in serveral seconds. And the two DML had not any output after the inner join with that table function. Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected graph when using table function. !image-2022-05-16-19-08-25-477.png! !image-2022-05-16-19-08-50-286.png! > failed to join with table function > -- > > Key: FLINK-27638 > URL: https://issues.apache.org/jira/browse/FLINK-27638 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-16-19-40-23-179.png > > > # register two table function named `FUNC_A` and `FUNC_B` > # left join with FUNC_A > # inner join with FUNC_B > # schedule the dml > after these steps, I found the task of FUNC_A keeped running but the task of > FUNC_B turned to be finished in serveral seconds. And I am not sure that the > unnormal task lead to empty output of the dml. > !image-2022-05-16-19-40-23-179.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27638) failed to join with table function
[ https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27638: -- Attachment: (was: image-2022-05-16-19-08-50-286.png) > failed to join with table function > -- > > Key: FLINK-27638 > URL: https://issues.apache.org/jira/browse/FLINK-27638 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-16-19-40-23-179.png > > > # register two table function named `FUNC_A` and `FUNC_B` > # left join with FUNC_A > # inner join with FUNC_B > # schedule the dml > after these steps, I found the task of FUNC_A keeped running but the task of > FUNC_B turned to be finished in serveral seconds. And I am not sure that the > unnormal task lead to empty output of the dml. > !image-2022-05-16-19-40-23-179.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27638) failed to join with table function
[ https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27638: -- Description: # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER` # create two complexible DML that both inner join with the table functioin and they are all streaming tasks. # schedule the two DML in one submission based on statementSet. atfer these steps I found that the table function was run on one exclusive task and it turned to be finished in serveral seconds. And the two DML had not any output after the inner join with that table function. Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected graph when using table function. !image-2022-05-16-19-08-25-477.png! !image-2022-05-16-19-08-50-286.png! was: # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER` # create two flinksql complexible DML that both inner join with the table functioin. # schedule the two DML in one submission based on statementSet. atfer these steps I found that the table function was run on one exclusive task and it turned to be finished in serveral seconds. And the two DML had not any output after the inner join with that table function. Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected graph when using table function. !image-2022-05-16-19-08-25-477.png! !image-2022-05-16-19-08-50-286.png! > failed to join with table function > -- > > Key: FLINK-27638 > URL: https://issues.apache.org/jira/browse/FLINK-27638 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-16-19-08-25-477.png, > image-2022-05-16-19-08-50-286.png > > > # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER` > # create two complexible DML that both inner join with the table functioin > and they are all streaming tasks. > # schedule the two DML in one submission based on statementSet. > atfer these steps I found that the table function was run on one exclusive > task and it turned to be finished in serveral seconds. And the two DML had > not any output after the inner join with that table function. > Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this > situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the > expected graph when using table function. > !image-2022-05-16-19-08-25-477.png! > > !image-2022-05-16-19-08-50-286.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27638) failed to join with table function
Spongebob created FLINK-27638: - Summary: failed to join with table function Key: FLINK-27638 URL: https://issues.apache.org/jira/browse/FLINK-27638 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob Attachments: image-2022-05-16-19-08-25-477.png, image-2022-05-16-19-08-50-286.png # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER` # create two flinksql complexible DML that both inner join with the table functioin. # schedule the two DML in one submission based on statementSet. atfer these steps I found that the table function was run on one exclusive task and it turned to be finished in serveral seconds. And the two DML had not any output after the inner join with that table function. Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected graph when using table function. !image-2022-05-16-19-08-25-477.png! !image-2022-05-16-19-08-50-286.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-27462) could not trigger checkpoint when using table function
[ https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-27462. - Release Note: Flink will stop checkpointing when some subtasks turn to be finished. And we can switch off this trait by ` config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);` Resolution: Not A Problem > could not trigger checkpoint when using table function > -- > > Key: FLINK-27462 > URL: https://issues.apache.org/jira/browse/FLINK-27462 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-01-11-22-03-127.png, > image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png > > > I found that it could not trigger checkpoints despite I had enable it in > environment. > I think this problem may be related to I had used a table function in my DML. > When I deploy the application the task dedicated to the table function turned > to be finished immediately despite the table function had declarate the > property `isDeterministic` to false. > Below is my basic code to recur the issue: > > {code:java} > // table function > public class GetStreamingModelSinkFilter extends TableFunction { > private boolean status = false; > private String statusRedisValue; > private final String redisKeyOfStatus; > private final RedisInfo redisInfo; > private RedisManager redisManager; > private RedisCommands redisCommands; > private long lastCheckTimestamp = 0L; > private long currentTimestamp; > public GetStreamingModelSinkFilter() { > ...initial something > } > } > @Override > public void open(FunctionContext context) throws Exception { > redisManager = new RedisManager(redisInfo); > redisCommands = redisManager.getCommands(); > } > @Override > public boolean isDeterministic() { > return false; > } > public void eval() { > if (status) { > collect(1); > } else { > currentTimestamp = System.currentTimeMillis(); > if (currentTimestamp - lastCheckTimestamp < 1000) { > collect(0); > } else { > statusRedisValue = redisCommands.get(redisKeyOfStatus); > if (Objects.equals(statusRedisValue, "1")) { > status = true; > collect(1); > } else { > lastCheckTimestamp = currentTimestamp; > collect(0); > } > } > } > } > @Override > public void close() throws Exception { > redisManager.close(); > } > } {code} > Below's the DML: > > > {code:java} > INSERT INTO TEST SELECT T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS > T(MARK) ON TRUE WHERE MARK = 1 > {code} > TASKS SNAPSHOT OF THE APPLICATION > !image-2022-05-01-11-22-03-127.png! > !image-2022-05-01-11-23-07-063.png! > !image-2022-05-01-11-23-24-938.png! > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27462) could not trigger checkpoint when using table function
[ https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27462: -- Description: I found that it could not trigger checkpoints despite I had enable it in environment. I think this problem may be related to I had used a table function in my DML. When I deploy the application the task dedicated to the table function turned to be finished immediately despite the table function had declarate the property `isDeterministic` to false. Below is my basic code to recur the issue: {code:java} // table function public class GetStreamingModelSinkFilter extends TableFunction { private boolean status = false; private String statusRedisValue; private final String redisKeyOfStatus; private final RedisInfo redisInfo; private RedisManager redisManager; private RedisCommands redisCommands; private long lastCheckTimestamp = 0L; private long currentTimestamp; public GetStreamingModelSinkFilter() { ...initial something } } @Override public void open(FunctionContext context) throws Exception { redisManager = new RedisManager(redisInfo); redisCommands = redisManager.getCommands(); } @Override public boolean isDeterministic() { return false; } public void eval() { if (status) { collect(1); } else { currentTimestamp = System.currentTimeMillis(); if (currentTimestamp - lastCheckTimestamp < 1000) { collect(0); } else { statusRedisValue = redisCommands.get(redisKeyOfStatus); if (Objects.equals(statusRedisValue, "1")) { status = true; collect(1); } else { lastCheckTimestamp = currentTimestamp; collect(0); } } } } @Override public void close() throws Exception { redisManager.close(); } } {code} Below's the DML: {code:java} INSERT INTO TEST SELECT T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS T(MARK) ON TRUE WHERE MARK = 1 {code} TASKS SNAPSHOT OF THE APPLICATION !image-2022-05-01-11-22-03-127.png! !image-2022-05-01-11-23-07-063.png! !image-2022-05-01-11-23-24-938.png! was: I found that it could not trigger checkpoints despite I had enable it in environment. I think this problem may be related to I had used a table function in my DML. When I deploy the application the task dedicated to the table function turned to be finished immediately despite the table function had declarate the property `isDeterministic` to false. Below is my basic code to recur the issue: {code:java} // table function public class GetStreamingModelSinkFilter extends TableFunction { private boolean status = false; private String statusRedisValue; private final String redisKeyOfStatus; private final RedisInfo redisInfo; private RedisManager redisManager; private RedisCommands redisCommands; private long lastCheckTimestamp = 0L; private long currentTimestamp; public GetStreamingModelSinkFilter() { ...initial something } } @Override public void open(FunctionContext context) throws Exception { redisManager = new RedisManager(redisInfo); redisCommands = redisManager.getCommands(); } @Override public boolean isDeterministic() { return false; } public void eval() { if (status) { collect(1); } else { currentTimestamp = System.currentTimeMillis(); if (currentTimestamp - lastCheckTimestamp < 1000) { collect(0); } else { statusRedisValue = redisCommands.get(redisKeyOfStatus); if (Objects.equals(statusRedisValue, "1")) { status = true; collect(1); } else { lastCheckTimestamp = currentTimestamp; collect(0); } } } } @Override public void close() throws Exception { redisManager.close(); } } {code} Below's the DML: {code:java} INSERT INTO TEST SELECT T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS T(MARK) ON TRUE WHERE MARK = 1 {code} TASKS SNAPSHOT OF THE APPLICATION !image-2022-05-01-11-22-03-127.png! > could not trigger checkpoint when using table function > -- > > Key: FLINK-27462 > URL: https://issues.apache.org/jira/browse/FLINK-27462 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments:
[jira] [Updated] (FLINK-27462) could not trigger checkpoint when using table function
[ https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27462: -- Attachment: image-2022-05-01-11-23-24-938.png > could not trigger checkpoint when using table function > -- > > Key: FLINK-27462 > URL: https://issues.apache.org/jira/browse/FLINK-27462 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-01-11-22-03-127.png, > image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png > > > I found that it could not trigger checkpoints despite I had enable it in > environment. > I think this problem may be related to I had used a table function in my DML. > When I deploy the application the task dedicated to the table function turned > to be finished immediately despite the table function had declarate the > property `isDeterministic` to false. > Below is my basic code to recur the issue: > > {code:java} > // table function > public class GetStreamingModelSinkFilter extends TableFunction { > private boolean status = false; > private String statusRedisValue; > private final String redisKeyOfStatus; > private final RedisInfo redisInfo; > private RedisManager redisManager; > private RedisCommands redisCommands; > private long lastCheckTimestamp = 0L; > private long currentTimestamp; > public GetStreamingModelSinkFilter() { > ...initial something > } > } > @Override > public void open(FunctionContext context) throws Exception { > redisManager = new RedisManager(redisInfo); > redisCommands = redisManager.getCommands(); > } > @Override > public boolean isDeterministic() { > return false; > } > public void eval() { > if (status) { > collect(1); > } else { > currentTimestamp = System.currentTimeMillis(); > if (currentTimestamp - lastCheckTimestamp < 1000) { > collect(0); > } else { > statusRedisValue = redisCommands.get(redisKeyOfStatus); > if (Objects.equals(statusRedisValue, "1")) { > status = true; > collect(1); > } else { > lastCheckTimestamp = currentTimestamp; > collect(0); > } > } > } > } > @Override > public void close() throws Exception { > redisManager.close(); > } > } {code} > Below's the DML: > > > {code:java} > INSERT INTO TEST SELECT T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS > T(MARK) ON TRUE WHERE MARK = 1 > {code} > TASKS SNAPSHOT OF THE APPLICATION > !image-2022-05-01-11-22-03-127.png! > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27462) could not trigger checkpoint when using table function
[ https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-27462: -- Attachment: image-2022-05-01-11-23-07-063.png > could not trigger checkpoint when using table function > -- > > Key: FLINK-27462 > URL: https://issues.apache.org/jira/browse/FLINK-27462 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > Attachments: image-2022-05-01-11-22-03-127.png, > image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png > > > I found that it could not trigger checkpoints despite I had enable it in > environment. > I think this problem may be related to I had used a table function in my DML. > When I deploy the application the task dedicated to the table function turned > to be finished immediately despite the table function had declarate the > property `isDeterministic` to false. > Below is my basic code to recur the issue: > > {code:java} > // table function > public class GetStreamingModelSinkFilter extends TableFunction { > private boolean status = false; > private String statusRedisValue; > private final String redisKeyOfStatus; > private final RedisInfo redisInfo; > private RedisManager redisManager; > private RedisCommands redisCommands; > private long lastCheckTimestamp = 0L; > private long currentTimestamp; > public GetStreamingModelSinkFilter() { > ...initial something > } > } > @Override > public void open(FunctionContext context) throws Exception { > redisManager = new RedisManager(redisInfo); > redisCommands = redisManager.getCommands(); > } > @Override > public boolean isDeterministic() { > return false; > } > public void eval() { > if (status) { > collect(1); > } else { > currentTimestamp = System.currentTimeMillis(); > if (currentTimestamp - lastCheckTimestamp < 1000) { > collect(0); > } else { > statusRedisValue = redisCommands.get(redisKeyOfStatus); > if (Objects.equals(statusRedisValue, "1")) { > status = true; > collect(1); > } else { > lastCheckTimestamp = currentTimestamp; > collect(0); > } > } > } > } > @Override > public void close() throws Exception { > redisManager.close(); > } > } {code} > Below's the DML: > > > {code:java} > INSERT INTO TEST SELECT T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS > T(MARK) ON TRUE WHERE MARK = 1 > {code} > TASKS SNAPSHOT OF THE APPLICATION > !image-2022-05-01-11-22-03-127.png! > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27462) could not trigger checkpoint when using table function
Spongebob created FLINK-27462: - Summary: could not trigger checkpoint when using table function Key: FLINK-27462 URL: https://issues.apache.org/jira/browse/FLINK-27462 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.3 Reporter: Spongebob Attachments: image-2022-05-01-11-22-03-127.png I found that it could not trigger checkpoints despite I had enable it in environment. I think this problem may be related to I had used a table function in my DML. When I deploy the application the task dedicated to the table function turned to be finished immediately despite the table function had declarate the property `isDeterministic` to false. Below is my basic code to recur the issue: {code:java} // table function public class GetStreamingModelSinkFilter extends TableFunction { private boolean status = false; private String statusRedisValue; private final String redisKeyOfStatus; private final RedisInfo redisInfo; private RedisManager redisManager; private RedisCommands redisCommands; private long lastCheckTimestamp = 0L; private long currentTimestamp; public GetStreamingModelSinkFilter() { ...initial something } } @Override public void open(FunctionContext context) throws Exception { redisManager = new RedisManager(redisInfo); redisCommands = redisManager.getCommands(); } @Override public boolean isDeterministic() { return false; } public void eval() { if (status) { collect(1); } else { currentTimestamp = System.currentTimeMillis(); if (currentTimestamp - lastCheckTimestamp < 1000) { collect(0); } else { statusRedisValue = redisCommands.get(redisKeyOfStatus); if (Objects.equals(statusRedisValue, "1")) { status = true; collect(1); } else { lastCheckTimestamp = currentTimestamp; collect(0); } } } } @Override public void close() throws Exception { redisManager.close(); } } {code} Below's the DML: {code:java} INSERT INTO TEST SELECT T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS T(MARK) ON TRUE WHERE MARK = 1 {code} TASKS SNAPSHOT OF THE APPLICATION !image-2022-05-01-11-22-03-127.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql
[ https://issues.apache.org/jira/browse/FLINK-27436 ] Spongebob deleted comment on FLINK-27436: --- was (Author: spongebobz): Hi [~martijnvisser] , When I deploy my flinksql application that contains multiple insertion sql that are both executed by one statementSet to standalone cluster, It did not tigger checkpoints despite some source table already got data input. > option `properties.group.id` is not effective in kafka connector for finksql > > > Key: FLINK-27436 > URL: https://issues.apache.org/jira/browse/FLINK-27436 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > option `properties.group.id` is not effective in kafka connector for finksql. > when I run this sql, I can read message from specific topic normaly. But I > could not > find the group named `test-group` in kafka server. > {code:java} > // ddl like this > "CREATE TABLE ... > "WITH ('connector' = 'kafka', > 'properties.bootstrap.servers' = '...'," + > "'topic' = '...',"+ > "'scan.startup.mode'='latest-offset'," + > "'properties.group.id' = 'test-group'," + > "'format' = 'debezium-json')"; {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql
[ https://issues.apache.org/jira/browse/FLINK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530329#comment-17530329 ] Spongebob commented on FLINK-27436: --- Hi [~martijnvisser] , When I deploy my flinksql application that contains multiple insertion sql that are both executed by one statementSet to standalone cluster, It did not tigger checkpoints despite some source table already got data input. > option `properties.group.id` is not effective in kafka connector for finksql > > > Key: FLINK-27436 > URL: https://issues.apache.org/jira/browse/FLINK-27436 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > option `properties.group.id` is not effective in kafka connector for finksql. > when I run this sql, I can read message from specific topic normaly. But I > could not > find the group named `test-group` in kafka server. > {code:java} > // ddl like this > "CREATE TABLE ... > "WITH ('connector' = 'kafka', > 'properties.bootstrap.servers' = '...'," + > "'topic' = '...',"+ > "'scan.startup.mode'='latest-offset'," + > "'properties.group.id' = 'test-group'," + > "'format' = 'debezium-json')"; {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql
[ https://issues.apache.org/jira/browse/FLINK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529278#comment-17529278 ] Spongebob commented on FLINK-27436: --- [~martijnvisser] Great I can see it now follow your advise. But the snapshot would not be made until data input into the connector. > option `properties.group.id` is not effective in kafka connector for finksql > > > Key: FLINK-27436 > URL: https://issues.apache.org/jira/browse/FLINK-27436 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > option `properties.group.id` is not effective in kafka connector for finksql. > when I run this sql, I can read message from specific topic normaly. But I > could not > find the group named `test-group` in kafka server. > {code:java} > // ddl like this > "CREATE TABLE ... > "WITH ('connector' = 'kafka', > 'properties.bootstrap.servers' = '...'," + > "'topic' = '...',"+ > "'scan.startup.mode'='latest-offset'," + > "'properties.group.id' = 'test-group'," + > "'format' = 'debezium-json')"; {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql
[ https://issues.apache.org/jira/browse/FLINK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529260#comment-17529260 ] Spongebob commented on FLINK-27436: --- [~martijnvisser] I had not. By the ways I could see this option was printed by both KafkaConsumer and AdminClient, but I don't know why it did not effect. > option `properties.group.id` is not effective in kafka connector for finksql > > > Key: FLINK-27436 > URL: https://issues.apache.org/jira/browse/FLINK-27436 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > option `properties.group.id` is not effective in kafka connector for finksql. > when I run this sql, I can read message from specific topic normaly. But I > could not > find the group named `test-group` in kafka server. > {code:java} > // ddl like this > "CREATE TABLE ... > "WITH ('connector' = 'kafka', > 'properties.bootstrap.servers' = '...'," + > "'topic' = '...',"+ > "'scan.startup.mode'='latest-offset'," + > "'properties.group.id' = 'test-group'," + > "'format' = 'debezium-json')"; {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql
Spongebob created FLINK-27436: - Summary: option `properties.group.id` is not effective in kafka connector for finksql Key: FLINK-27436 URL: https://issues.apache.org/jira/browse/FLINK-27436 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.14.3 Reporter: Spongebob option `properties.group.id` is not effective in kafka connector for finksql. when I run this sql, I can read message from specific topic normaly. But I could not find the group named `test-group` in kafka server. {code:java} // ddl like this "CREATE TABLE ... "WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = '...'," + "'topic' = '...',"+ "'scan.startup.mode'='latest-offset'," + "'properties.group.id' = 'test-group'," + "'format' = 'debezium-json')"; {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-26925) loss scale in union situation
[ https://issues.apache.org/jira/browse/FLINK-26925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-26925. - Resolution: Not A Problem > loss scale in union situation > - > > Key: FLINK-26925 > URL: https://issues.apache.org/jira/browse/FLINK-26925 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Spongebob >Priority: Major > > when I union two columns that datatypes are decimal(38,4) and decimal(38,2), > but got decimal(38,2) in return. This cause the problem that loss scale in > result set. I think the final datatype should be decimal(38,4) would be fine. > {code:java} > TableEnvironment tableEnvironment = > TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); > Table t1 = tableEnvironment.sqlQuery("select cast(1.23 as decimal(38,2)) as > a"); > Table t2 = tableEnvironment.sqlQuery("select cast(4.5678 as decimal(38,4)) as > a"); > tableEnvironment.createTemporaryView("t1", t1); > tableEnvironment.createTemporaryView("t2", t2); > tableEnvironment.executeSql("select a from t1 union all select a from > t2").print(); > tableEnvironment.sqlQuery("select a from t1 union all select a from > t2").printSchema(); > // output > +--+ > | a | > +--+ > | 1.23 | > | 4.57 | > +--+ > 2 rows in set > ( > `a` DECIMAL(38, 2) NOT NULL > ){code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26925) loss scale in union situation
Spongebob created FLINK-26925: - Summary: loss scale in union situation Key: FLINK-26925 URL: https://issues.apache.org/jira/browse/FLINK-26925 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Spongebob when I union two columns that datatypes are decimal(38,4) and decimal(38,2), but got decimal(38,2) in return. This cause the problem that loss scale in result set. I think the final datatype should be decimal(38,4) would be fine. {code:java} TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); Table t1 = tableEnvironment.sqlQuery("select cast(1.23 as decimal(38,2)) as a"); Table t2 = tableEnvironment.sqlQuery("select cast(4.5678 as decimal(38,4)) as a"); tableEnvironment.createTemporaryView("t1", t1); tableEnvironment.createTemporaryView("t2", t2); tableEnvironment.executeSql("select a from t1 union all select a from t2").print(); tableEnvironment.sqlQuery("select a from t1 union all select a from t2").printSchema(); // output +--+ | a | +--+ | 1.23 | | 4.57 | +--+ 2 rows in set ( `a` DECIMAL(38, 2) NOT NULL ){code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26461) Throw CannotPlanException in TableFunction
[ https://issues.apache.org/jira/browse/FLINK-26461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502100#comment-17502100 ] Spongebob commented on FLINK-26461: --- Hi lincoln, I found this problem just occur in the version that preceding Flink 1.14.3 such as 1.14.2 > Throw CannotPlanException in TableFunction > -- > > Key: FLINK-26461 > URL: https://issues.apache.org/jira/browse/FLINK-26461 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > I got an CannotPlanException when change the isDeterministic option to false. > For detail see this code: > {code:java} > //代码占位符 > public class GetDayTimeEtlSwitch extends TableFunction { > private boolean status = false; > @Override > public boolean isDeterministic() { > return false; > } > public void eval() { > if (status) { > collect(1); > } else { > if (System.currentTimeMillis() > 1646298908000L) { > status = true; > collect(1); > } else { > collect(0); > } > } > } > } {code} > Exception stack... > {code:java} > //代码占位符 > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot > generate a valid execution plan for the given query: > FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], > fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]) > +- FlinkLogicalJoin(condition=[true], joinType=[left]) > :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, > PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME]) > : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]) > +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], > rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query > uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742) > at TestSwitch.main(TestSwitch.java:33) > Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There > are not enough rules to produce a node with desired properties: > convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, > MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], > UpdateKindTraitDef=[NONE]. > Missing conversion is FlinkLogicalTableFunctionScan[convention:
[jira] [Commented] (FLINK-26461) Throw CannotPlanException in TableFunction
[ https://issues.apache.org/jira/browse/FLINK-26461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500647#comment-17500647 ] Spongebob commented on FLINK-26461: --- Hi Lincoln, this is my application code: {code:java} //代码占位符 TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); tableEnvironment.createTemporaryFunction("GET_SWITCH", GetDayTimeEtlSwitch.class); String dailyScoreCdcDDL = "CREATE TABLE DAILY_SCORE_CDC(" + "STUNAME STRING," + "SUBJECT STRING," + "SCORE DECIMAL(10,0)," + "PROC_TIME AS PROCTIME()" + ") WITH (" + "'connector'='oracle-cdc'," + "..." ")"; String dml = "SELECT * FROM DAILY_SCORE_CDC LEFT JOIN LATERAL TABLE(GET_SWITCH()) AS T(STATUS) ON TRUE"; tableEnvironment.executeSql(dailyScoreCdcDDL); tableEnvironment.executeSql(dml).print(); {code} > Throw CannotPlanException in TableFunction > -- > > Key: FLINK-26461 > URL: https://issues.apache.org/jira/browse/FLINK-26461 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > I got an CannotPlanException when change the isDeterministic option to false. > For detail see this code: > {code:java} > //代码占位符 > public class GetDayTimeEtlSwitch extends TableFunction { > private boolean status = false; > @Override > public boolean isDeterministic() { > return false; > } > public void eval() { > if (status) { > collect(1); > } else { > if (System.currentTimeMillis() > 1646298908000L) { > status = true; > collect(1); > } else { > collect(0); > } > } > } > } {code} > Exception stack... > {code:java} > //代码占位符 > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot > generate a valid execution plan for the given query: > FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], > fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]) > +- FlinkLogicalJoin(condition=[true], joinType=[left]) > :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, > PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME]) > : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]) > +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], > rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query > uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) > at
[jira] [Created] (FLINK-26461) Throw CannotPlanException in TableFunction
Spongebob created FLINK-26461: - Summary: Throw CannotPlanException in TableFunction Key: FLINK-26461 URL: https://issues.apache.org/jira/browse/FLINK-26461 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob I got an CannotPlanException when change the isDeterministic option to false. For detail see this code: {code:java} //代码占位符 public class GetDayTimeEtlSwitch extends TableFunction { private boolean status = false; @Override public boolean isDeterministic() { return false; } public void eval() { if (status) { collect(1); } else { if (System.currentTimeMillis() > 1646298908000L) { status = true; collect(1); } else { collect(0); } } } } {code} Exception stack... {code:java} //代码占位符 Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]) +- FlinkLogicalJoin(condition=[true], joinType=[left]) :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]) +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742) at TestSwitch.main(TestSwitch.java:33) Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]. Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single] There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows 168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])Root: rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE] Original rel: FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]],
[jira] [Closed] (FLINK-26384) This exception indicates that the query uses an unsupported SQL feature.
[ https://issues.apache.org/jira/browse/FLINK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-26384. - Resolution: Not A Problem > This exception indicates that the query uses an unsupported SQL feature. > > > Key: FLINK-26384 > URL: https://issues.apache.org/jira/browse/FLINK-26384 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.3 >Reporter: Spongebob >Priority: Major > > Got an unsupported exception when generating a valid execution plan. > {code:java} > //代码占位符 > TableEnvironment tableEnvironment = > TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); > // HERE'S DDL AND DML SQL > CREATE TABLE IF NOT EXISTS TABLE_A (CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP > DECIMAL(20,4),TSCP DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),SCR_INCD > DECIMAL(19,0),AST_TYPE DECIMAL(3,0),SCR_TYPE DECIMAL(3,0)) WITH ('connector' > = 'jdbc'...) > CREATE TABLE IF NOT EXISTS TABLE_B (AST_TYPE DECIMAL(4,0),CIR_CAPT > DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP > DECIMAL(20,4),SCR_INCD DECIMAL(19,0),SCR_TYPE DECIMAL(8,0),TSCP > DECIMAL(20,4), PRIMARY KEY (SCR_INCD) NOT ENFORCED) WITH ('connector' = > 'jdbc'...) > INSERT INTO TABLE_B > SELECT T_SOURCE.* FROM ( > SELECT CASE WHEN B.AST_TYPE IS NULL THEN 1 ELSE B.AST_TYPE END AS AST_TYPE, > B.CIR_CAPT AS CIR_CAPT, B.FULL_MKT_CIR_CAPT AS FULL_MKT_CIR_CAPT, > B.FULL_MKT_TSCP AS FULL_MKT_TSCP, B.SCR_INCD AS SCR_INCD, CASE WHEN > B.SCR_TYPE IS NULL THEN 1 ELSE B.SCR_TYPE END AS SCR_TYPE, B.TSCP AS TSCP > FROM TABLE_A B > ) T_SOURCE INNER JOIN ( SELECT DISTINCT SCR_INCD FROM TABLE_B) T_SINK ON > T_SOURCE.SCR_INCD = T_SINK.SCR_INCD > // EXCEPTION TRACK > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752) > at >
[jira] [Created] (FLINK-26384) This exception indicates that the query uses an unsupported SQL feature.
Spongebob created FLINK-26384: - Summary: This exception indicates that the query uses an unsupported SQL feature. Key: FLINK-26384 URL: https://issues.apache.org/jira/browse/FLINK-26384 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob Got an unsupported exception when generating a valid execution plan. {code:java} //代码占位符 TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); // HERE'S DDL AND DML SQL CREATE TABLE IF NOT EXISTS TABLE_A (CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP DECIMAL(20,4),TSCP DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),SCR_INCD DECIMAL(19,0),AST_TYPE DECIMAL(3,0),SCR_TYPE DECIMAL(3,0)) WITH ('connector' = 'jdbc'...) CREATE TABLE IF NOT EXISTS TABLE_B (AST_TYPE DECIMAL(4,0),CIR_CAPT DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP DECIMAL(20,4),SCR_INCD DECIMAL(19,0),SCR_TYPE DECIMAL(8,0),TSCP DECIMAL(20,4), PRIMARY KEY (SCR_INCD) NOT ENFORCED) WITH ('connector' = 'jdbc'...) INSERT INTO TABLE_B SELECT T_SOURCE.* FROM ( SELECT CASE WHEN B.AST_TYPE IS NULL THEN 1 ELSE B.AST_TYPE END AS AST_TYPE, B.CIR_CAPT AS CIR_CAPT, B.FULL_MKT_CIR_CAPT AS FULL_MKT_CIR_CAPT, B.FULL_MKT_TSCP AS FULL_MKT_TSCP, B.SCR_INCD AS SCR_INCD, CASE WHEN B.SCR_TYPE IS NULL THEN 1 ELSE B.SCR_TYPE END AS SCR_TYPE, B.TSCP AS TSCP FROM TABLE_A B ) T_SOURCE INNER JOIN ( SELECT DISTINCT SCR_INCD FROM TABLE_B) T_SINK ON T_SOURCE.SCR_INCD = T_SINK.SCR_INCD // EXCEPTION TRACK This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124) at com.xctech.cone.etl.migrate.batch.runner.RunSingleTask.main(RunSingleTask.java:111) Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversions are FlinkLogicalTableSourceScan[convention: LOGICAL ->
[jira] [Commented] (FLINK-26327) throw not a literal exception in callContext.getArgumentValue when getTypeInference
[ https://issues.apache.org/jira/browse/FLINK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497243#comment-17497243 ] Spongebob commented on FLINK-26327: --- {code:java} //代码占位符 Exception in thread "main" java.lang.AssertionError: not a literal: NVL($0, 2) at org.apache.calcite.rex.RexLiteral.findValue(RexLiteral.java:1161) at org.apache.calcite.rex.RexLiteral.value(RexLiteral.java:1133) at org.apache.calcite.rex.RexCallBinding.getOperandLiteralValue(RexCallBinding.java:89) at org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext$2.getValueAs(OperatorBindingCallContext.java:102) at org.apache.flink.table.planner.functions.inference.AbstractSqlCallContext.getLiteralValueAs(AbstractSqlCallContext.java:123) at org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext.getArgumentValue(OperatorBindingCallContext.java:98) at org.apache.flink.table.types.inference.utils.AdaptedCallContext.getArgumentValue(AdaptedCallContext.java:91) at com.xctech.cone.etl.migrate.udf.RoundX.lambda$getTypeInference$0(RoundX.java:47) at org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType(TypeInferenceUtil.java:152) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:100) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73) at org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:137) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:137) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:162) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:48) at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
[jira] [Created] (FLINK-26327) throw not a literal exception in callContext.getArgumentValue when getTypeInference
Spongebob created FLINK-26327: - Summary: throw not a literal exception in callContext.getArgumentValue when getTypeInference Key: FLINK-26327 URL: https://issues.apache.org/jira/browse/FLINK-26327 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.2 Reporter: Spongebob {code:java} //代码占位符 tableEnvironment.createTemporaryFunction("ROUNDX", RoundX.class); tableEnvironment.createTemporaryFunction("NVL", Nvl.class); tableEnvironment.executeSql("select ROUNDX( CAST(1.12345 as decimal(10,3)),NVL(MAX(f0),2) ) from t1").print(); // exception Exception in thread "main" java.lang.AssertionError: not a literal: NVL($0, 2) // trace // `NVL` is a scalarFunction that likes oracle nvl function. And this exception might be thrown from this code in my `getTypeInference` function of ROUNDX scalarFunction. Optional secondValue = callContext.getArgumentValue(1, Integer.class);{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25502) eval method of Flink ScalerFunction only run one time
Spongebob created FLINK-25502: - Summary: eval method of Flink ScalerFunction only run one time Key: FLINK-25502 URL: https://issues.apache.org/jira/browse/FLINK-25502 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.2 Reporter: Spongebob assume that there is one scalerFunction named `id` which's eval method takes no arguments and return increasing int value on each calling. Now I found that when I call `id()` function in FlinkSQL that has 3 rows , the eval method only was called one time so I got the same id value for each row. The sql likes 'SELECT f0, id() FROM T'. So I decided to define one argument on `eval` method. When I execute sql 'SELECT f0, id(1) FROM T' I got the same id value still. But when I execute sql 'SELECT f0, id(f0) FROM T' then I could get the correct id value, because the eval method was called by three times now. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25373) task manager can not free memory when jobs are finished
[ https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-25373. - Resolution: Not A Problem > task manager can not free memory when jobs are finished > --- > > Key: FLINK-25373 > URL: https://issues.apache.org/jira/browse/FLINK-25373 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.0 > Environment: flink 1.14.0 >Reporter: Spongebob >Priority: Major > Attachments: image-2021-12-19-11-48-33-622.png > > > I submit my Flinksql jobs to the Flink standalone cluster and what out of my > expectation is that TaskManagers could not free memory when all jobs are > finished whether normally or not. > And I found that there were many threads named like ` > flink-taskexecutor-io-thread-x` and their states were waiting on conditions. > here's the detail of these threads: > > "flink-taskexecutor-io-thread-31" Id=5386 WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at sun.misc.Unsafe.park(Native Method) > - waiting on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > !image-2021-12-19-11-48-33-622.png! > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25373) task manager can not free memory when jobs are finished
[ https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462420#comment-17462420 ] Spongebob commented on FLINK-25373: --- Hi Xintong, Thanks for your kind explanatory, now I will close this ticket. > task manager can not free memory when jobs are finished > --- > > Key: FLINK-25373 > URL: https://issues.apache.org/jira/browse/FLINK-25373 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.0 > Environment: flink 1.14.0 >Reporter: Spongebob >Priority: Major > Attachments: image-2021-12-19-11-48-33-622.png > > > I submit my Flinksql jobs to the Flink standalone cluster and what out of my > expectation is that TaskManagers could not free memory when all jobs are > finished whether normally or not. > And I found that there were many threads named like ` > flink-taskexecutor-io-thread-x` and their states were waiting on conditions. > here's the detail of these threads: > > "flink-taskexecutor-io-thread-31" Id=5386 WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at sun.misc.Unsafe.park(Native Method) > - waiting on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > !image-2021-12-19-11-48-33-622.png! > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25373) task manager can not free memory when jobs are finished
[ https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462403#comment-17462403 ] Spongebob commented on FLINK-25373: --- Hi Xintong, # I have checked the memory usage of Task Manager Process via `top` command, it showed that the task manager process had used high memory. But I could not found any big object in `jmap` return. # And meanwhile, the report in Flink webUI showed that task manager used high heap memory. # Then I run GC command manually, the report in Flink webUI showed that the usage had turn to low, but `top` command did not. > task manager can not free memory when jobs are finished > --- > > Key: FLINK-25373 > URL: https://issues.apache.org/jira/browse/FLINK-25373 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.0 > Environment: flink 1.14.0 >Reporter: Spongebob >Priority: Major > Attachments: image-2021-12-19-11-48-33-622.png > > > I submit my Flinksql jobs to the Flink standalone cluster and what out of my > expectation is that TaskManagers could not free memory when all jobs are > finished whether normally or not. > And I found that there were many threads named like ` > flink-taskexecutor-io-thread-x` and their states were waiting on conditions. > here's the detail of these threads: > > "flink-taskexecutor-io-thread-31" Id=5386 WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at sun.misc.Unsafe.park(Native Method) > - waiting on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > !image-2021-12-19-11-48-33-622.png! > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25373) task manager can not free memory when jobs are finished
[ https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-25373: -- Description: I submit my Flinksql jobs to the Flink standalone cluster and what out of my expectation is that TaskManagers could not free memory when all jobs are finished whether normally or not. And I found that there were many threads named like ` flink-taskexecutor-io-thread-x` and their states were waiting on conditions. here's the detail of these threads: "flink-taskexecutor-io-thread-31" Id=5386 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c at sun.misc.Unsafe.park(Native Method) - waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) !image-2021-12-19-11-48-33-622.png! was: I submit my Flinksql jobs to the Flink standalone cluster and what out of my expectation is that TaskManagers could not free memory when all jobs are finished whether normally or not. And I found that there were many threads named like ` flink-taskexecutor-io-thread-x` and their states were waiting on conditions. !image-2021-12-19-11-48-33-622.png! > task manager can not free memory when jobs are finished > --- > > Key: FLINK-25373 > URL: https://issues.apache.org/jira/browse/FLINK-25373 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.0 > Environment: flink 1.14.0 >Reporter: Spongebob >Priority: Major > Attachments: image-2021-12-19-11-48-33-622.png > > > I submit my Flinksql jobs to the Flink standalone cluster and what out of my > expectation is that TaskManagers could not free memory when all jobs are > finished whether normally or not. > And I found that there were many threads named like ` > flink-taskexecutor-io-thread-x` and their states were waiting on conditions. > here's the detail of these threads: > > "flink-taskexecutor-io-thread-31" Id=5386 WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at sun.misc.Unsafe.park(Native Method) > - waiting on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > !image-2021-12-19-11-48-33-622.png! > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25373) task manager can not free memory when jobs are finished
Spongebob created FLINK-25373: - Summary: task manager can not free memory when jobs are finished Key: FLINK-25373 URL: https://issues.apache.org/jira/browse/FLINK-25373 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.0 Environment: flink 1.14.0 Reporter: Spongebob Attachments: image-2021-12-19-11-48-33-622.png I submit my Flinksql jobs to the Flink standalone cluster and what out of my expectation is that TaskManagers could not free memory when all jobs are finished whether normally or not. And I found that there were many threads named like ` flink-taskexecutor-io-thread-x` and their states were waiting on conditions. !image-2021-12-19-11-48-33-622.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24837) submit flink job failed via restapi
Spongebob created FLINK-24837: - Summary: submit flink job failed via restapi Key: FLINK-24837 URL: https://issues.apache.org/jira/browse/FLINK-24837 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.12.4 Reporter: Spongebob I tried to submit flink job via flink restapi but got exception, I used `await` function in my job so that it would submit multiple jobs. Below is the exception detail. {code:java} // "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)\n\tatjava.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tatjava.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tatjava.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)\n\tatjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tatjava.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tatjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tatjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tatjava.lang.Thread.run(Thread.java:748)\nCaused by: java.util.concurrent.CompletionException:org.apache.flink.util.FlinkRuntimeException: Could not execute application.\n\tatjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tatjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)\n\t... 7 more\nCaused by:org.apache.flink.util.FlinkRuntimeException: Could not execute application.\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t... 7 more\nCaused by:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error:org.apache.flink.table.api.TableException: Failed to wait job finish\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)\n\tatorg.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)\n\tatorg.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\t...10 more\nCaused by: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to waitjob finish\n\tat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tatjava.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)\n\tatcom.xctech.cone.flink.migrate.batch.BatchCone.main(BatchCone.java:238)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tatsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tatjava.lang.reflect.Method.invoke(Method.java:498)\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)\n\t... 13 more\nCaused by:org.apache.flink.table.api.TableException: Failed to wait job finish\n\tatorg.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)\n\tatorg.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)\n\tatorg.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)\n\tatjava.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)\n\t... 3 more\nCaused by:org.apache.flink.util.FlinkRuntimeException: The Job Result cannot be fetched through the Job Client when in
[jira] [Commented] (FLINK-24473) getString value from any datatype columns
[ https://issues.apache.org/jira/browse/FLINK-24473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17425607#comment-17425607 ] Spongebob commented on FLINK-24473: --- Hi Jark, I am dealing with the RowData objects in RichSinkFunction and need to extract values from them, but I can not extract the values correctly as they were not string type. Maybe you can view the codes below {code:java} @Override public void invoke(RowData value, Context context) { String v = value.getString(0).toString; }{code} > getString value from any datatype columns > - > > Key: FLINK-24473 > URL: https://issues.apache.org/jira/browse/FLINK-24473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.4 >Reporter: Spongebob >Priority: Minor > Attachments: image-2021-10-07-21-17-20-192.png > > > Hope flink would support getting string value from any other datatype column, > such as from int、decimal column. At current flink would throw cast exception > when we do that. > > !image-2021-10-07-21-17-20-192.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24473) getString value from any datatype columns
[ https://issues.apache.org/jira/browse/FLINK-24473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-24473: -- Attachment: image-2021-10-07-21-17-20-192.png > getString value from any datatype columns > - > > Key: FLINK-24473 > URL: https://issues.apache.org/jira/browse/FLINK-24473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.4 >Reporter: Spongebob >Priority: Minor > Attachments: image-2021-10-07-21-17-20-192.png > > > Hope flink would support getting string value from any other datatype column, > such as from int、decimal column. At current flink would throw cast exception > when we do that. > > !image-2021-10-07-21-15-05-001.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24473) getString value from any datatype columns
[ https://issues.apache.org/jira/browse/FLINK-24473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-24473: -- Description: Hope flink would support getting string value from any other datatype column, such as from int、decimal column. At current flink would throw cast exception when we do that. !image-2021-10-07-21-17-20-192.png! was: Hope flink would support getting string value from any other datatype column, such as from int、decimal column. At current flink would throw cast exception when we do that. !image-2021-10-07-21-15-05-001.png! > getString value from any datatype columns > - > > Key: FLINK-24473 > URL: https://issues.apache.org/jira/browse/FLINK-24473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.4 >Reporter: Spongebob >Priority: Minor > Attachments: image-2021-10-07-21-17-20-192.png > > > Hope flink would support getting string value from any other datatype column, > such as from int、decimal column. At current flink would throw cast exception > when we do that. > > !image-2021-10-07-21-17-20-192.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24473) getString value from any datatype columns
Spongebob created FLINK-24473: - Summary: getString value from any datatype columns Key: FLINK-24473 URL: https://issues.apache.org/jira/browse/FLINK-24473 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.4 Reporter: Spongebob Hope flink would support getting string value from any other datatype column, such as from int、decimal column. At current flink would throw cast exception when we do that. !image-2021-10-07-21-15-05-001.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24338) Provide an interface to validate flinksql
[ https://issues.apache.org/jira/browse/FLINK-24338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418603#comment-17418603 ] Spongebob commented on FLINK-24338: --- Hi Patrick, I'd like to ask that how can I validate FlinkSQL with calcite? If you could give me some sample that would be great. > Provide an interface to validate flinksql > - > > Key: FLINK-24338 > URL: https://issues.apache.org/jira/browse/FLINK-24338 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Spongebob >Priority: Minor > > It would be great if there is an interface that can validate flinksql. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24338) Provide an interface to validate flinksql
[ https://issues.apache.org/jira/browse/FLINK-24338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417599#comment-17417599 ] Spongebob commented on FLINK-24338: --- Our system receives FLINK SQL from website input and then need validate the submitted SQL, if the SQL is correct then we would storage it into database. Thus we are questing some methods that can validate FLINK SQL except metadata exceptions and it is supposed to be independent with FLINK context. > Provide an interface to validate flinksql > - > > Key: FLINK-24338 > URL: https://issues.apache.org/jira/browse/FLINK-24338 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Spongebob >Priority: Minor > > It would be great if there is an interface that can validate flinksql. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24338) Provide an interface to validate flinksql
Spongebob created FLINK-24338: - Summary: Provide an interface to validate flinksql Key: FLINK-24338 URL: https://issues.apache.org/jira/browse/FLINK-24338 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: shaded-14.0 Reporter: Spongebob It would be great if there is an interface that can validate flinksql. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24289) Load data repeatedly between different jobs
[ https://issues.apache.org/jira/browse/FLINK-24289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415395#comment-17415395 ] Spongebob commented on FLINK-24289: --- So are there any ways that could make the source table to be loaded once only between two jobs? > Load data repeatedly between different jobs > --- > > Key: FLINK-24289 > URL: https://issues.apache.org/jira/browse/FLINK-24289 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.4 >Reporter: Spongebob >Priority: Major > > I had created an jdbc source table named tbl_a, and then I executed two DMLs > that both visited tbl_a in await mode, it means flink executed DML1 and then > DML2. Out of my expetation that flink had load tbl_a from underlying database > both in DML1 and DML2. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24289) Load data repeatedly between different jobs
Spongebob created FLINK-24289: - Summary: Load data repeatedly between different jobs Key: FLINK-24289 URL: https://issues.apache.org/jira/browse/FLINK-24289 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.4 Reporter: Spongebob I had created an jdbc source table named tbl_a, and then I executed two DMLs that both visited tbl_a in await mode, it means flink executed DML1 and then DML2. Out of my expetation that flink had load tbl_a from underlying database both in DML1 and DML2. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24104) incorrect validation of sink schema
Spongebob created FLINK-24104: - Summary: incorrect validation of sink schema Key: FLINK-24104 URL: https://issues.apache.org/jira/browse/FLINK-24104 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.4 Environment: Flink: 1.12.4 Reporter: Spongebob incorrect validation of sink schema, for example, I create jdbc table `user` as {code:java} //代码占位符 create table user( id int, name varchar(32), desc varchar(16) ){code} And then I connect to this jdbc table in flink, when I execute my sink sql which like {code:java} //代码占位符 insert into flink_catalog_user(desc, name, id) select ... // please mind that the order of sink columns is not consist with ddl statement.{code} , Flink application throws validation exception that is about sink schema incorrect. So it seams flink would ignore my sink column's order and it would get the sink table schema instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23177) watermarks generated in dateStream can not flow into table
[ https://issues.apache.org/jira/browse/FLINK-23177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371891#comment-17371891 ] Spongebob commented on FLINK-23177: --- Thanks for your reply. It works normally now that is basic on fromDataStream. And I found if I set fields in `#createTemporaryView` it would works normally also. > watermarks generated in dateStream can not flow into table > -- > > Key: FLINK-23177 > URL: https://issues.apache.org/jira/browse/FLINK-23177 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.13.1 > Environment: flink: 1.13.1 >Reporter: Spongebob >Priority: Major > > I have assigned watermark in dataStream and then use the > `createTemporaryView` method to build a table that is source from the > dataStream. Out of my expectation, the watermarks works normally in > dataStream but the watermarks of the table stay at -9223372036854775808 > forever. > {code:java} > def main(args: Array[String]): Unit = { > val streamEnv = ... > streamEnv.enableCheckpointing(1000) > streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) > val dataStream = ... > val resultStream = dataStream.map( > value => { > val data = value.split(",") > (data(0), data(1).toInt) > } > ).assignTimestampsAndWatermarks(WatermarkStrategy > .forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] > { > override def extractTimestamp(element: (String, Int), > recordTimestamp: Long): Long = element._2 * 1000 > })) > .process(new MyProcessFunc) > //resultStream.print("raw") > //streamEnv.execute("") > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > streamTableEnv.createTemporaryView("catalog_test1", resultStream) > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23177) watermarks generated in dateStream can not flow into table
[ https://issues.apache.org/jira/browse/FLINK-23177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-23177: -- Description: I have assigned watermark in dataStream and then use the `createTemporaryView` method to build a table that is source from the dataStream. Out of my expectation, the watermarks works normally in dataStream but the watermarks of the table stay at -9223372036854775808 forever. {code:java} def main(args: Array[String]): Unit = { val streamEnv = ... streamEnv.enableCheckpointing(1000) streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) val dataStream = ... val resultStream = dataStream.map( value => { val data = value.split(",") (data(0), data(1).toInt) } ).assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] { override def extractTimestamp(element: (String, Int), recordTimestamp: Long): Long = element._2 * 1000 })) .process(new MyProcessFunc) //resultStream.print("raw") //streamEnv.execute("") val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) streamTableEnv.createTemporaryView("catalog_test1", resultStream) ... } {code} was: I have assigned watermark in dataStream and then use the `createTemporaryView` method to build a table that is source from the dataStream. Out of my expectation, the watermarks works normally in dataStream but the watermarks of the table stay at -9223372036854775808 forever. {code:java} def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.enableCheckpointing(1000) streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) val dataStream = streamEnv.socketTextStream("192.168.164.105", ) val resultStream = dataStream.map( value => { val data = value.split(",") (data(0), data(1).toInt) } ).assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] { override def extractTimestamp(element: (String, Int), recordTimestamp: Long): Long = element._2 * 1000 })) .process(new MyProcessFunc) //resultStream.print("raw") //streamEnv.execute("") val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) streamTableEnv.createTemporaryView("catalog_test1", resultStream) val catalog = buildHiveCatalog streamTableEnv.registerCatalog("hive", catalog) streamTableEnv.useCatalog("hive") streamTableEnv.executeSql("insert into test1 select _1,_2 from default_catalog.default_database.catalog_test1").print() } {code} > watermarks generated in dateStream can not flow into table > -- > > Key: FLINK-23177 > URL: https://issues.apache.org/jira/browse/FLINK-23177 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.13.1 > Environment: flink: 1.13.1 >Reporter: Spongebob >Priority: Major > > I have assigned watermark in dataStream and then use the > `createTemporaryView` method to build a table that is source from the > dataStream. Out of my expectation, the watermarks works normally in > dataStream but the watermarks of the table stay at -9223372036854775808 > forever. > {code:java} > def main(args: Array[String]): Unit = { > val streamEnv = ... > streamEnv.enableCheckpointing(1000) > streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) > val dataStream = ... > val resultStream = dataStream.map( > value => { > val data = value.split(",") > (data(0), data(1).toInt) > } > ).assignTimestampsAndWatermarks(WatermarkStrategy > .forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] > { > override def extractTimestamp(element: (String, Int), > recordTimestamp: Long): Long = element._2 * 1000 > })) > .process(new MyProcessFunc) > //resultStream.print("raw") > //streamEnv.execute("") > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > streamTableEnv.createTemporaryView("catalog_test1", resultStream) > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23177) watermarks generated in dateStream can not flow into table
Spongebob created FLINK-23177: - Summary: watermarks generated in dateStream can not flow into table Key: FLINK-23177 URL: https://issues.apache.org/jira/browse/FLINK-23177 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.13.1 Environment: flink: 1.13.1 Reporter: Spongebob I have assigned watermark in dataStream and then use the `createTemporaryView` method to build a table that is source from the dataStream. Out of my expectation, the watermarks works normally in dataStream but the watermarks of the table stay at -9223372036854775808 forever. {code:java} def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.enableCheckpointing(1000) streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) val dataStream = streamEnv.socketTextStream("192.168.164.105", ) val resultStream = dataStream.map( value => { val data = value.split(",") (data(0), data(1).toInt) } ).assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] { override def extractTimestamp(element: (String, Int), recordTimestamp: Long): Long = element._2 * 1000 })) .process(new MyProcessFunc) //resultStream.print("raw") //streamEnv.execute("") val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) streamTableEnv.createTemporaryView("catalog_test1", resultStream) val catalog = buildHiveCatalog streamTableEnv.registerCatalog("hive", catalog) streamTableEnv.useCatalog("hive") streamTableEnv.executeSql("insert into test1 select _1,_2 from default_catalog.default_database.catalog_test1").print() } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table
[ https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-22874. - Release Note: my misunderstanding of streaming partition sink cause this issue, it worked normally after I had enabled checkpointing. Resolution: Not A Problem > flink table partition trigger doesn't effect as expectation when sink into > hive table > - > > Key: FLINK-22874 > URL: https://issues.apache.org/jira/browse/FLINK-22874 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: Spongebob >Priority: Major > > I am trying to sink into hive partitioned table which partition commit > trigger is declared as " > partition-time", and I had assigned watermark on the dataStream. When I input > some data into dataStream it can not commit hive partition on time. Here's my > code > {code:java} > //ddl of hive table > create table test_table(username string) > partitioned by (ts bigint) > stored as orc > TBLPROPERTIES ( > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.policy.kind'='metastore,success-file' > );{code} > {code:java} > // flink application code > val streamEnv = ... > val dataStream:DataStream[(String, Long)] = ... > // assign watermark and output watermark info in processFunction > class MyProcessFunction extends ProcessFunction[(String, Long), (String, > Long, Long)] { > override def processElement(value: (String, Long), ctx: > ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: > Collector[(String, Long, Long)]): Unit = { > out.collect((value._1, value._2, ctx.timerService().currentWatermark())) > } > } > val resultStream = dataStream > .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { > override def extractTimestamp(element: (String, Long), recordTimestamp: > Long): Long = { > element._2 * 1000 > } > })) > .process(new MyProcessFunction) > // > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > // convert dataStream into hive catalog table and sink into hive > streamTableEnv.createTemporaryView("test_catalog_t", resultStream) > val catalog = ... > streamTableEnv.registerCatalog("hive", catalog) > streamTableEnv.useCatalog("hive") > streamTableEnv.executeSql("insert into test_table select _1,_2 from > default_catalog.default_database.test_catalog_t").print() > // flink use the default parallelism 4 > // input data > (a, 1) > (b, 2) > (c, 3) > (d, 4) > (a, 5) > ... > // result > there are much partition directories on hdfs but all they are inprogressing > files and never would be commit to hive metastore.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table
[ https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358996#comment-17358996 ] Spongebob commented on FLINK-22874: --- Yes my misunderstanding of streaming partition sink cause this issue, it worked normally after I had enabled checkpointing. > flink table partition trigger doesn't effect as expectation when sink into > hive table > - > > Key: FLINK-22874 > URL: https://issues.apache.org/jira/browse/FLINK-22874 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: Spongebob >Priority: Major > > I am trying to sink into hive partitioned table which partition commit > trigger is declared as " > partition-time", and I had assigned watermark on the dataStream. When I input > some data into dataStream it can not commit hive partition on time. Here's my > code > {code:java} > //ddl of hive table > create table test_table(username string) > partitioned by (ts bigint) > stored as orc > TBLPROPERTIES ( > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.policy.kind'='metastore,success-file' > );{code} > {code:java} > // flink application code > val streamEnv = ... > val dataStream:DataStream[(String, Long)] = ... > // assign watermark and output watermark info in processFunction > class MyProcessFunction extends ProcessFunction[(String, Long), (String, > Long, Long)] { > override def processElement(value: (String, Long), ctx: > ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: > Collector[(String, Long, Long)]): Unit = { > out.collect((value._1, value._2, ctx.timerService().currentWatermark())) > } > } > val resultStream = dataStream > .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { > override def extractTimestamp(element: (String, Long), recordTimestamp: > Long): Long = { > element._2 * 1000 > } > })) > .process(new MyProcessFunction) > // > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > // convert dataStream into hive catalog table and sink into hive > streamTableEnv.createTemporaryView("test_catalog_t", resultStream) > val catalog = ... > streamTableEnv.registerCatalog("hive", catalog) > streamTableEnv.useCatalog("hive") > streamTableEnv.executeSql("insert into test_table select _1,_2 from > default_catalog.default_database.test_catalog_t").print() > // flink use the default parallelism 4 > // input data > (a, 1) > (b, 2) > (c, 3) > (d, 4) > (a, 5) > ... > // result > there are much partition directories on hdfs but all they are inprogressing > files and never would be commit to hive metastore.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table
Spongebob created FLINK-22874: - Summary: flink table partition trigger doesn't effect as expectation when sink into hive table Key: FLINK-22874 URL: https://issues.apache.org/jira/browse/FLINK-22874 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.13.1 Reporter: Spongebob I am trying to sink into hive partitioned table which partition commit trigger is declared as " partition-time", and I had assigned watermark on the dataStream. When I input some data into dataStream it can not commit hive partition on time. Here's my code {code:java} //ddl of hive table create table test_table(username string) partitioned by (ts bigint) stored as orc TBLPROPERTIES ( 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.policy.kind'='metastore,success-file' );{code} {code:java} // flink application code val streamEnv = ... val dataStream:DataStream[(String, Long)] = ... // assign watermark and output watermark info in processFunction class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, Long)] { override def processElement(value: (String, Long), ctx: ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: Collector[(String, Long, Long)]): Unit = { out.collect((value._1, value._2, ctx.timerService().currentWatermark())) } } val resultStream = dataStream .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = { element._2 * 1000 } })) .process(new MyProcessFunction) // val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) // convert dataStream into hive catalog table and sink into hive streamTableEnv.createTemporaryView("test_catalog_t", resultStream) val catalog = ... streamTableEnv.registerCatalog("hive", catalog) streamTableEnv.useCatalog("hive") streamTableEnv.executeSql("insert into test_table select _1,_2 from default_catalog.default_database.test_catalog_t").print() // flink use the default parallelism 4 // input data (a, 1) (b, 2) (c, 3) (d, 4) (a, 5) ... // result there are much partition directories on hdfs but all they are inprogressing files and never would be commit to hive metastore.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka
[ https://issues.apache.org/jira/browse/FLINK-22190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347469#comment-17347469 ] Spongebob commented on FLINK-22190: --- my mistake lead to this issus. But I found that I must set group.id when source from kafka and enable checkpoints other else I would got an org.apache.kafka.common.errors.InvalidGroupIdException that would end the sink tasks. > no guarantee on Flink exactly_once sink to Kafka > - > > Key: FLINK-22190 > URL: https://issues.apache.org/jira/browse/FLINK-22190 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.12.2 > Environment: *flink: 1.12.2* > *kafka: 2.7.0* >Reporter: Spongebob >Priority: Major > > When I tried to test the function of flink exactly_once sink to kafka, I > found it can not run as expectation. here's the pipline of the flink > applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka > topic2 -> flink app2, flink tasks may met / byZeroException in random. Below > shows the codes: > {code:java} > //代码占位符 > raw data, flink app0: > class SimpleSource1 extends SourceFunction[String] { > var switch = true > val students: Array[String] = Array("Tom", "Jerry", "Gory") > override def run(sourceContext: SourceFunction.SourceContext[String]): Unit > = { > var i = 0 > while (switch) { > sourceContext.collect(s"${students(Random.nextInt(students.length))},$i") > i += 1 > Thread.sleep(5000) > } > } > override def cancel(): Unit = switch = false > } > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val dataStream = streamEnv.addSource(new SimpleSource1) > dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", > "single-partition-topic-2", new SimpleStringSchema())) > streamEnv.execute("sink kafka") > > flink-app1: > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) > val prop = new Properties() > prop.setProperty("bootstrap.servers", "xfy:9092") > prop.setProperty("group.id", "test") > val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( > "single-partition-topic-2", > new SimpleStringSchema, > prop > )) > val resultStream = dataStream.map(x => { > val data = x.split(",") > (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString() > } > ) > resultStream.print().setParallelism(1) > val propProducer = new Properties() > propProducer.setProperty("bootstrap.servers", "xfy:9092") > propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}") > resultStream.addSink(new FlinkKafkaProducer[String]( > "single-partition-topic", > new MyKafkaSerializationSchema("single-partition-topic"), > propProducer, > Semantic.EXACTLY_ONCE)) > streamEnv.execute("sink kafka") > > flink-app2: > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val prop = new Properties() > prop.setProperty("bootstrap.servers", "xfy:9092") > prop.setProperty("group.id", "test") > prop.setProperty("isolation_level", "read_committed") > val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( > "single-partition-topic", > new SimpleStringSchema, > prop > )) > dataStream.print().setParallelism(1) > streamEnv.execute("consumer kafka"){code} > > flink app1 will print some duplicate numbers, and to my expectation flink > app2 will deduplicate them but the fact shows not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22458) Failed when operating empty catalog table
[ https://issues.apache.org/jira/browse/FLINK-22458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331808#comment-17331808 ] Spongebob commented on FLINK-22458: --- can you explain why this happened and how to avoid this issue? > Failed when operating empty catalog table > - > > Key: FLINK-22458 > URL: https://issues.apache.org/jira/browse/FLINK-22458 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.2 > Environment: Flink: 1.12.2 >Reporter: Spongebob >Priority: Major > > The pipline might like: HiveTable -> FlinkCatalogTable(might be empty) -> > HiveTable > It runs normally when the FlinkCatalogTable is not empty, But When > FlinkCatalogTable is empty, Jobmanager throws this exception: > {code:java} > java.lang.Exception: Failed to finalize execution on master at > org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1373) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:877) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1241) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1610) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1584) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:663) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_251] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_251] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_251] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_251] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flownData-1.0-jar-with-dependencies.jar:?] at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flownData-1.0-jar-with-dependencies.jar:?] at > scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > [flownData-1.0-jar-with-dependencies.jar:?] at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flownData-1.0-jar-with-dependencies.jar:?] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flownData-1.0-jar-with-dependencies.jar:?] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flownData-1.0-jar-with-dependencies.jar:?] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.actor.Actor.aroundReceive(Actor.scala:517) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.actor.Actor.aroundReceive$(Actor.scala:515) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flownData-1.0-jar-with-dependencies.jar:?] at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flownData-1.0-jar-with-dependencies.jar:?] at >
[jira] [Created] (FLINK-22458) Failed when operating empty catalog table
Spongebob created FLINK-22458: - Summary: Failed when operating empty catalog table Key: FLINK-22458 URL: https://issues.apache.org/jira/browse/FLINK-22458 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.2 Environment: Flink: 1.12.2 Reporter: Spongebob The pipline might like: HiveTable -> FlinkCatalogTable(might be empty) -> HiveTable It runs normally when the FlinkCatalogTable is not empty, But When FlinkCatalogTable is empty, Jobmanager throws this exception: {code:java} java.lang.Exception: Failed to finalize execution on master at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1373) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:877) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1241) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1610) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1584) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:663) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) ~[flownData-1.0-jar-with-dependencies.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_251] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_251] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_251] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_251] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flownData-1.0-jar-with-dependencies.jar:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flownData-1.0-jar-with-dependencies.jar:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flownData-1.0-jar-with-dependencies.jar:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flownData-1.0-jar-with-dependencies.jar:?] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flownData-1.0-jar-with-dependencies.jar:?] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flownData-1.0-jar-with-dependencies.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flownData-1.0-jar-with-dependencies.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flownData-1.0-jar-with-dependencies.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flownData-1.0-jar-with-dependencies.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flownData-1.0-jar-with-dependencies.jar:?] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flownData-1.0-jar-with-dependencies.jar:?] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flownData-1.0-jar-with-dependencies.jar:?] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flownData-1.0-jar-with-dependencies.jar:?] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flownData-1.0-jar-with-dependencies.jar:?] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flownData-1.0-jar-with-dependencies.jar:?] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flownData-1.0-jar-with-dependencies.jar:?] Caused by:
[jira] [Commented] (FLINK-21914) Trying to access closed classloader
[ https://issues.apache.org/jira/browse/FLINK-21914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331698#comment-17331698 ] Spongebob commented on FLINK-21914: --- anyone can help with this issue? When this issue happen, there will remain an empty staging directory on hdfs site. But it runs normally on local ide. > Trying to access closed classloader > --- > > Key: FLINK-21914 > URL: https://issues.apache.org/jira/browse/FLINK-21914 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.12.2 > Environment: flink: 1.12.2 > hadoop: 3.1.3 > hive: 3.1.2 > >Reporter: Spongebob >Priority: Critical > Labels: stale-critical > Attachments: app.log > > > I am trying to deploy flink application on yarn, but got this exception: > Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to > access closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > > This application tested pass on my local environment. And the application > detail is read and write into hive via flink table environment. you can view > attachment for yarn log which source and sink data info was deleted > Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to > access closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check- > leaked-classloader'. > {code} > Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to > access closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183) > at > org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780) > at > org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036) > at > org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995) > at > org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968) > at > org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848) > at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200) > at > org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812) > at > org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789) > at > org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183) > at > org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145) > at > org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65) > at > org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21914) Trying to access closed classloader
[ https://issues.apache.org/jira/browse/FLINK-21914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-21914: -- Labels: (was: stale-critical) > Trying to access closed classloader > --- > > Key: FLINK-21914 > URL: https://issues.apache.org/jira/browse/FLINK-21914 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.12.2 > Environment: flink: 1.12.2 > hadoop: 3.1.3 > hive: 3.1.2 > >Reporter: Spongebob >Priority: Critical > Attachments: app.log > > > I am trying to deploy flink application on yarn, but got this exception: > Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to > access closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > > This application tested pass on my local environment. And the application > detail is read and write into hive via flink table environment. you can view > attachment for yarn log which source and sink data info was deleted > Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to > access closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check- > leaked-classloader'. > {code} > Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to > access closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183) > at > org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780) > at > org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036) > at > org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995) > at > org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968) > at > org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848) > at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200) > at > org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812) > at > org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789) > at > org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183) > at > org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145) > at > org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65) > at > org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22422) -ynm option doesn't effect when deploying flink on yarn
[ https://issues.apache.org/jira/browse/FLINK-22422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331429#comment-17331429 ] Spongebob commented on FLINK-22422: --- Actually it works normally when using "-m yarn-cluster" only. What's the difference between "-m yarn-cluster" and "-t yarn-per-job" ? I can only see the option "-t yarn-per-job" on flink document site. > -ynm option doesn't effect when deploying flink on yarn > --- > > Key: FLINK-22422 > URL: https://issues.apache.org/jira/browse/FLINK-22422 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.12.2 > Environment: flink: 1.12.2 > hadoop: 3.1.3 >Reporter: Spongebob >Priority: Major > > -ynm option doesn't effect when deploying flink on yarn, it still appears to > be "Flink per-job cluster" on yarn webui. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22422) -ynm option doesn't effect when deploying flink on yarn
[ https://issues.apache.org/jira/browse/FLINK-22422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331382#comment-17331382 ] Spongebob commented on FLINK-22422: --- Hi Yang, here's the command: flink run -m yarn-cluster -ys 1 -yjm 2G -ytm 2G -yqu default -d -t yarn-per-job -d -ynm dim_gnd_category_activity -c dim.dim_gnd_category_activity gop-emp-roster_report/empData-3.0-jar-with-dependencies.jar > -ynm option doesn't effect when deploying flink on yarn > --- > > Key: FLINK-22422 > URL: https://issues.apache.org/jira/browse/FLINK-22422 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.12.2 > Environment: flink: 1.12.2 > hadoop: 3.1.3 >Reporter: Spongebob >Priority: Major > > -ynm option doesn't effect when deploying flink on yarn, it still appears to > be "Flink per-job cluster" on yarn webui. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22422) -ynm option doesn't effect when deploying flink on yarn
Spongebob created FLINK-22422: - Summary: -ynm option doesn't effect when deploying flink on yarn Key: FLINK-22422 URL: https://issues.apache.org/jira/browse/FLINK-22422 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.12.2 Environment: flink: 1.12.2 hadoop: 3.1.3 Reporter: Spongebob -ynm option doesn't effect when deploying flink on yarn, it still appears to be "Flink per-job cluster" on yarn webui. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22247) can not pass AddressList when connecting to rabbitmq
[ https://issues.apache.org/jira/browse/FLINK-22247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-22247: -- Environment: flink: 1.12.2 rabbitmq: 3.8.4 was: flink: 2.12.2 rabbitmq: 3.8.4 > can not pass AddressList when connecting to rabbitmq > > > Key: FLINK-22247 > URL: https://issues.apache.org/jira/browse/FLINK-22247 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.12.2 > Environment: flink: 1.12.2 > rabbitmq: 3.8.4 >Reporter: Spongebob >Priority: Major > > We hope to connect to rabbitmq cluster address when using rabbitmq connector, > So we override the setupConnection function to pass the rabbitmq cluster > address, but the address class is not serializable thereby flink throws > exception. > {code:java} > //代码占位符 > val rabbitmqAddresses = Array( > new Address("xxx1", 5672), > new Address("xxx2", 5672), > new Address("xxx3", 5672)) > val dataStream = streamEnv > .addSource(new RMQSource[String]( > rabbitmqConfig, // rabbitmq's connection config > "queueName", // queue name > true, // using correlation ids, assurance of exactly-once consume from > rabbitmq > new SimpleStringSchema // java deserialization > ) { > override def setupQueue(): Unit = {} > override def setupConnection(): Connection = { > rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses) > } > }).setParallelism(1) > {code} > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: > [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object > probably contains or references non serializable fields.Exception in thread > "main" org.apache.flink.api.common.InvalidProgramException: > [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object > probably contains or references non serializable fields. at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693) > at testConsumer$.main(testConsumer.scala:30) at > testConsumer.main(testConsumer.scala)Caused by: > java.io.NotSerializableException: com.rabbitmq.client.Address at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) > ... 9 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22247) can not pass AddressList when connecting to rabbitmq
Spongebob created FLINK-22247: - Summary: can not pass AddressList when connecting to rabbitmq Key: FLINK-22247 URL: https://issues.apache.org/jira/browse/FLINK-22247 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.12.2 Environment: flink: 2.12.2 rabbitmq: 3.8.4 Reporter: Spongebob We hope to connect to rabbitmq cluster address when using rabbitmq connector, So we override the setupConnection function to pass the rabbitmq cluster address, but the address class is not serializable thereby flink throws exception. {code:java} //代码占位符 val rabbitmqAddresses = Array( new Address("xxx1", 5672), new Address("xxx2", 5672), new Address("xxx3", 5672)) val dataStream = streamEnv .addSource(new RMQSource[String]( rabbitmqConfig, // rabbitmq's connection config "queueName", // queue name true, // using correlation ids, assurance of exactly-once consume from rabbitmq new SimpleStringSchema // java deserialization ) { override def setupQueue(): Unit = {} override def setupConnection(): Connection = { rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses) } }).setParallelism(1) {code} Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object probably contains or references non serializable fields.Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693) at testConsumer$.main(testConsumer.scala:30) at testConsumer.main(testConsumer.scala)Caused by: java.io.NotSerializableException: com.rabbitmq.client.Address at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 9 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka
Spongebob created FLINK-22190: - Summary: no guarantee on Flink exactly_once sink to Kafka Key: FLINK-22190 URL: https://issues.apache.org/jira/browse/FLINK-22190 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.12.2 Environment: *flink: 1.12.2* *kafka: 2.7.0* Reporter: Spongebob When I tried to test the function of flink exactly_once sink to kafka, I found it can not run as expectation. here's the pipline of the flink applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka topic2 -> flink app2, flink tasks may met / byZeroException in random. Below shows the codes: {code:java} //代码占位符 raw data, flink app0: class SimpleSource1 extends SourceFunction[String] { var switch = true val students: Array[String] = Array("Tom", "Jerry", "Gory") override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { var i = 0 while (switch) { sourceContext.collect(s"${students(Random.nextInt(students.length))},$i") i += 1 Thread.sleep(5000) } } override def cancel(): Unit = switch = false } val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = streamEnv.addSource(new SimpleSource1) dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", "single-partition-topic-2", new SimpleStringSchema())) streamEnv.execute("sink kafka") flink-app1: val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) val prop = new Properties() prop.setProperty("bootstrap.servers", "xfy:9092") prop.setProperty("group.id", "test") val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( "single-partition-topic-2", new SimpleStringSchema, prop )) val resultStream = dataStream.map(x => { val data = x.split(",") (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString() } ) resultStream.print().setParallelism(1) val propProducer = new Properties() propProducer.setProperty("bootstrap.servers", "xfy:9092") propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}") resultStream.addSink(new FlinkKafkaProducer[String]( "single-partition-topic", new MyKafkaSerializationSchema("single-partition-topic"), propProducer, Semantic.EXACTLY_ONCE)) streamEnv.execute("sink kafka") flink-app2: val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers", "xfy:9092") prop.setProperty("group.id", "test") prop.setProperty("isolation_level", "read_committed") val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( "single-partition-topic", new SimpleStringSchema, prop )) dataStream.print().setParallelism(1) streamEnv.execute("consumer kafka"){code} flink app1 will print some duplicate numbers, and to my expectation flink app2 will deduplicate them but the fact shows not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv
Spongebob created FLINK-22165: - Summary: How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv Key: FLINK-22165 URL: https://issues.apache.org/jira/browse/FLINK-22165 Project: Flink Issue Type: New Feature Components: API / DataStream Affects Versions: 1.12.2 Environment: Flink 1.12.2 rabbitmq 3.8.4 Reporter: Spongebob Flink rabbitmq module provides source and sink function for rabbitmq. We can use the correlationId to deduplicate the checkpoints record, So can we set a correlationId for each message to sink into rabbitmq ? -- This message was sent by Atlassian Jira (v8.3.4#803005)