[jira] [Created] (FLINK-33855) send flag message like checkpoint barrier

2023-12-15 Thread Spongebob (Jira)
Spongebob created FLINK-33855:
-

 Summary: send flag message like checkpoint barrier
 Key: FLINK-33855
 URL: https://issues.apache.org/jira/browse/FLINK-33855
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / API
Reporter: Spongebob


I use flinksql to consume kafka message. Now I want to send one specific 
message to that kafka topic, and I hope the sink table that use user defined 
connector can receive this specific message. maybe it sounds like the barrier 
to trigger checkpoints. Is there anyway to solve this debt ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32970) could not load external class using "-C" option

2023-08-31 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-32970.
-
Resolution: Won't Fix

> could not load external class using "-C" option
> ---
>
> Key: FLINK-32970
> URL: https://issues.apache.org/jira/browse/FLINK-32970
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.6
>Reporter: Spongebob
>Priority: Major
>
> Firstly, the "connectors.jar" contains "test-connector" and I put it in user 
> libs.
> Then, I started a tableEvironment in one operator function of 
> streamExecutionEnvironment.
> In the tableEvironment I declared a table using the "test-connector".
> Finally, I run the application and load the "connectors.jar" using "-C 
> connectors.jar", when the table's creation statement was executed, I got an 
> class not found exception which like below(please notice that if I put the 
> "connectors.jar" in flink lib, the application would run normally):
> {code:java}
> SLF4J: Found binding in 
> [jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
>  Found binding in 
> [jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.SLF4J: Actual binding is of type 
> [org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException:
>  org.apache.flink.table.api.TableException: Failed to wait job finish
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
>at 
> com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58)
>at 
> com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60)
>  at 
> com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103)
>at 
> com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)  
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  at 
> 

[jira] [Commented] (FLINK-32970) could not load external class using "-C" option

2023-08-31 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760800#comment-17760800
 ] 

Spongebob commented on FLINK-32970:
---

Hi [~martijnvisser] , I think I should not create another Environment in 
operation functions. Because that would lauch a local Environment.

> could not load external class using "-C" option
> ---
>
> Key: FLINK-32970
> URL: https://issues.apache.org/jira/browse/FLINK-32970
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.6
>Reporter: Spongebob
>Priority: Major
>
> Firstly, the "connectors.jar" contains "test-connector" and I put it in user 
> libs.
> Then, I started a tableEvironment in one operator function of 
> streamExecutionEnvironment.
> In the tableEvironment I declared a table using the "test-connector".
> Finally, I run the application and load the "connectors.jar" using "-C 
> connectors.jar", when the table's creation statement was executed, I got an 
> class not found exception which like below(please notice that if I put the 
> "connectors.jar" in flink lib, the application would run normally):
> {code:java}
> SLF4J: Found binding in 
> [jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
>  Found binding in 
> [jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.SLF4J: Actual binding is of type 
> [org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException:
>  org.apache.flink.table.api.TableException: Failed to wait job finish
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
>at 
> com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58)
>at 
> com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60)
>  at 
> com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103)
>at 
> com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)  
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at 

[jira] [Updated] (FLINK-32970) could not load external class using "-C" option

2023-08-28 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32970:
--
Description: 
Firstly, the "connectors.jar" contains "test-connector" and I put it in user 
libs.

Then, I started a tableEvironment in one operator function of 
streamExecutionEnvironment.

In the tableEvironment I declared a table using the "test-connector".

Finally, I run the application and load the "connectors.jar" using "-C 
connectors.jar", when the table's creation statement was executed, I got an 
class not found exception which like below(please notice that if I put the 
"connectors.jar" in flink lib, the application would run normally):
{code:java}
SLF4J: Found binding in 
[jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
 Found binding in 
[jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
 See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.SLF4J: Actual binding is of type 
[org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException:
 org.apache.flink.table.api.TableException: Failed to wait job finish  at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)  at 
org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
  at 
org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
   at 
com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58)
   at 
com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60)
 at 
com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103)
   at 
com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
   at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
  at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
  at 
org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35)
  at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48)
  at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34)
  at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
  at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)  
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)   
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)   at 

[jira] [Created] (FLINK-32970) could not load external class using "-C" option

2023-08-28 Thread Spongebob (Jira)
Spongebob created FLINK-32970:
-

 Summary: could not load external class using "-C" option
 Key: FLINK-32970
 URL: https://issues.apache.org/jira/browse/FLINK-32970
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.6
Reporter: Spongebob


Firstly, the "connectors.jar" contains "test-connector" and I put it in user 
libs.

Then, I started a tableEvironment in one operator function of 
streamExecutionEnvironment.

In the tableEvironment I declared a table using the "test-connector".

Finally, when the table's creation statement was executed, I got an class not 
found exception which like below(please notice that if I put the 
"connectors.jar" in flink lib, the application would run normally):
{code:java}
SLF4J: Found binding in 
[jar:file:/data/hadoop-3.3.5/tmpdata/nm-local-dir/usercache/root/appcache/application_1690443774859_0439/filecache/13/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
 Found binding in 
[jar:file:/data/hadoop-3.3.5/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
 See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.SLF4J: Actual binding is of type 
[org.apache.logging.slf4j.Log4jLoggerFactory]java.util.concurrent.ExecutionException:
 org.apache.flink.table.api.TableException: Failed to wait job finish  at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)  at 
org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
  at 
org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
   at 
com.xctech.cone.data.sql.model.runner.ModelRunner.executeStatementSet(ModelRunner.java:58)
   at 
com.xctech.cone.data.versionedStarRocks.MicroBatchModelRunner.run(MicroBatchModelRunner.java:60)
 at 
com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:103)
   at 
com.xctech.cone.data.versionedStarRocks.ExecuteSQLFunction.invoke(ExecuteSQLFunction.java:25)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
   at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
  at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
  at 
org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35)
  at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48)
  at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34)
  at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
  at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)  
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)   
 at 

[jira] [Closed] (FLINK-32361) error after replace dependent jar file

2023-08-28 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-32361.
-
Resolution: Cannot Reproduce

> error after replace dependent jar file
> --
>
> Key: FLINK-32361
> URL: https://issues.apache.org/jira/browse/FLINK-32361
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
>
> in the standalone session mode. I have one dependent jar file named 'A.jar' 
> in the folder `lib1`, so I submit my app via command `flink run -C 
> file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 
> And, I have the same jar file named 'A.jar' in the folder `lib2` also which 
> was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
> `lib2` to `lib1`, re-submit the application. Finally I would get an 
> ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-32739.
-
Resolution: Not A Problem

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
>     at 
> 

[jira] [Commented] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750937#comment-17750937
 ] 

Spongebob commented on FLINK-32739:
---

Thanks [~JunRuiLi] , this option helps. When one of regions restart, it will be 
restarted in the original job or new job ?

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> 

[jira] [Comment Edited] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750648#comment-17750648
 ] 

Spongebob edited comment on FLINK-32739 at 8/3/23 9:44 AM:
---

[~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the 
failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack 
was in the issue description.

And I found some relative logs in the other taskmanger:
{code:java}
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request 109179dd9cefa1f4b109b8f726d0e0f7 for job 
15ffcdad572bd98fd74ffd45848e84cb from resource manager with leader id 
.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for 109179dd9cefa1f4b109b8f726d0e0f7.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
15ffcdad572bd98fd74ffd45848e84cb for job leader monitoring.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager 
akka.tcp://flink@192.168.0.11:6123/user/rpc/jobmanager_3 with leader id 
----.
2023-08-03 17:21:41,621 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
2023-08-03 17:21:41,625 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager 
akka.tcp://flink@192.168.0.11:6123/user/rpc/jobmanager_3 for job 
15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,625 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,626 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
reserved slots to the leader of job 15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,627 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:2, state:ALLOCATED, resource profile: 
ResourceProfile{cpuCores=1, taskHeapMemory=256.000mb (268435451 bytes), 
taskOffHeapMemory=0 bytes, managedMemory=245.760mb (257698041 bytes), 
networkMemory=61.440mb (64424510 bytes)}, allocationId: 
109179dd9cefa1f4b109b8f726d0e0f7, jobId: 15ffcdad572bd98fd74ffd45848e84cb).
2023-08-03 17:21:41,629 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
15ffcdad572bd98fd74ffd45848e84cb from job leader monitoring.
2023-08-03 17:21:41,629 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb.{code}


was (Author: spongebobz):
[~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the 
failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack 
was in the issue description.

And I found some relative logs in the other taskmanger:
{code:java}
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request 109179dd9cefa1f4b109b8f726d0e0f7 for job 
15ffcdad572bd98fd74ffd45848e84cb from resource manager with leader id 
.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for 109179dd9cefa1f4b109b8f726d0e0f7.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
15ffcdad572bd98fd74ffd45848e84cb for job leader monitoring.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 
with leader id ----.
2023-08-03 17:21:41,621 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
2023-08-03 17:21:41,625 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager 
akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 for job 
15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,625 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,626 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
reserved slots to the leader of job 15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,627 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:2, state:ALLOCATED, resource profile: 
ResourceProfile{cpuCores=1, taskHeapMemory=256.000mb (268435451 bytes), 
taskOffHeapMemory=0 bytes, 

[jira] [Comment Edited] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750648#comment-17750648
 ] 

Spongebob edited comment on FLINK-32739 at 8/3/23 9:44 AM:
---

[~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the 
failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack 
was in the issue description.

And I found some relative logs in the other taskmanger:
{code:java}
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request 109179dd9cefa1f4b109b8f726d0e0f7 for job 
15ffcdad572bd98fd74ffd45848e84cb from resource manager with leader id 
.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for 109179dd9cefa1f4b109b8f726d0e0f7.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
15ffcdad572bd98fd74ffd45848e84cb for job leader monitoring.
2023-08-03 17:21:41,618 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 
with leader id ----.
2023-08-03 17:21:41,621 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
2023-08-03 17:21:41,625 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager 
akka.tcp://flink@10.150.9.36:6123/user/rpc/jobmanager_3 for job 
15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,625 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,626 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
reserved slots to the leader of job 15ffcdad572bd98fd74ffd45848e84cb.
2023-08-03 17:21:41,627 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:2, state:ALLOCATED, resource profile: 
ResourceProfile{cpuCores=1, taskHeapMemory=256.000mb (268435451 bytes), 
taskOffHeapMemory=0 bytes, managedMemory=245.760mb (257698041 bytes), 
networkMemory=61.440mb (64424510 bytes)}, allocationId: 
109179dd9cefa1f4b109b8f726d0e0f7, jobId: 15ffcdad572bd98fd74ffd45848e84cb).
2023-08-03 17:21:41,629 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
15ffcdad572bd98fd74ffd45848e84cb from job leader monitoring.
2023-08-03 17:21:41,629 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
JobManager connection for job 15ffcdad572bd98fd74ffd45848e84cb. {code}


was (Author: spongebobz):
[~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the 
failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack 
was in the issue description.

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> 

[jira] [Commented] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750648#comment-17750648
 ] 

Spongebob commented on FLINK-32739:
---

[~JunRuiLi] Hi Junrui, I had uploaded the JM partitial logs relative to the 
failed job which jobid is `15ffcdad572bd98fd74ffd45848e84cb`. The Error stack 
was in the issue description.

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> 

[jira] [Updated] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32739:
--
Attachment: jobmanager.log

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
>     at 
> 

[jira] [Updated] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32739:
--
Description: 
In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
following steps:
 # submit streaming job which was configed to fixed restart strategy to flink 
session environment
 # this job was running on taskmanager1. Then I killed the taskmanager1.
 # this job turned to be failed after restarting attemps.

this job could not be transported to taskmanager2 which had enough slots as 
expected.

Here's the exception trace:
{code:java}
2023-08-03 15:13:56
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=1)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
    at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
    at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
    at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
    at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
    at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at 
org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
    at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
    at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
    at 

[jira] [Updated] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32739:
--
Description: 
In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
following steps:
 # submit streaming job which was configed to fixed restart strategy to flink 
session environment
 # this job was running on taskmanager1. Then I killed the taskmanager1.
 # this job turned to be failed after restarting attemps.

this job could not be transported to taskmanager2 as expected.

Here's the exception trace:
{code:java}
2023-08-03 15:13:56
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=1)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
    at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
    at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
    at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
    at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
    at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at 
org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
    at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
    at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
    at 

[jira] [Created] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)
Spongebob created FLINK-32739:
-

 Summary: restarting not work
 Key: FLINK-32739
 URL: https://issues.apache.org/jira/browse/FLINK-32739
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.5
Reporter: Spongebob


In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
following steps:
 # submit streaming job which was configed to fixed restart strategy to flink 
session environment
 # this job was running on taskmanager1. Then I killed the taskmanager1.
 # this job turned to be failed after restarting attemps.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-24837) submit flink job failed via restapi

2023-08-03 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-24837.
-
Resolution: Won't Fix

> submit flink job failed via restapi
> ---
>
> Key: FLINK-24837
> URL: https://issues.apache.org/jira/browse/FLINK-24837
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.12.4
>Reporter: Spongebob
>Priority: Major
>
> I tried to submit flink job via flink restapi but got exception, I used 
> `await` function in my job so that it would submit multiple jobs. Below is 
> the exception detail.
> {code:java}
> //
> "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not 
> execute 
> application.\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)\n\tatjava.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tatjava.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tatjava.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)\n\tatjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tatjava.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tatjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tatjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tatjava.lang.Thread.run(Thread.java:748)\nCaused
>  by: 
> java.util.concurrent.CompletionException:org.apache.flink.util.FlinkRuntimeException:
>  Could not execute 
> application.\n\tatjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tatjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)\n\t...
>  7 more\nCaused by:org.apache.flink.util.FlinkRuntimeException: Could not 
> execute 
> application.\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t...
>  7 more\nCaused 
> by:org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error:org.apache.flink.table.api.TableException: Failed to 
> wait job 
> finish\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)\n\tatorg.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)\n\tatorg.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\t...10
>  more\nCaused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to waitjob finish\n\tat 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tatjava.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)\n\tatcom.xctech.cone.flink.migrate.batch.BatchCone.main(BatchCone.java:238)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke0(Native
>  
> Method)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tatsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tatjava.lang.reflect.Method.invoke(Method.java:498)\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)\n\t...
>  13 more\nCaused by:org.apache.flink.table.api.TableException: Failed to wait 
> job 
> 

[jira] [Reopened] (FLINK-32361) error after replace dependent jar file

2023-06-18 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob reopened FLINK-32361:
---

Hi [~Adhip] yes I think this is a bug so I report this issue here.

> error after replace dependent jar file
> --
>
> Key: FLINK-32361
> URL: https://issues.apache.org/jira/browse/FLINK-32361
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
>
> in the standalone session mode. I have one dependent jar file named 'A.jar' 
> in the folder `lib1`, so I submit my app via command `flink run -C 
> file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 
> And, I have the same jar file named 'A.jar' in the folder `lib2` also which 
> was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
> `lib2` to `lib1`, re-submit the application. Finally I would get an 
> ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32361) error after replace dependent jar file

2023-06-18 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733980#comment-17733980
 ] 

Spongebob edited comment on FLINK-32361 at 6/19/23 1:57 AM:


Hi [~mapohl] yes I think this is a bug so I report this issue here.


was (Author: spongebobz):
Hi [~Adhip] yes I think this is a bug so I report this issue here.

> error after replace dependent jar file
> --
>
> Key: FLINK-32361
> URL: https://issues.apache.org/jira/browse/FLINK-32361
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
>
> in the standalone session mode. I have one dependent jar file named 'A.jar' 
> in the folder `lib1`, so I submit my app via command `flink run -C 
> file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 
> And, I have the same jar file named 'A.jar' in the folder `lib2` also which 
> was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
> `lib2` to `lib1`, re-submit the application. Finally I would get an 
> ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32361) error after replace dependent jar file

2023-06-15 Thread Spongebob (Jira)
Spongebob created FLINK-32361:
-

 Summary: error after replace dependent jar file
 Key: FLINK-32361
 URL: https://issues.apache.org/jira/browse/FLINK-32361
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.5
Reporter: Spongebob


in the standalone session mode. I have one dependent jar file named 'A.jar' in 
the folder `lib1`, so I submit my app via command `flink run -C 
file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 

And, I have the same jar file named 'A.jar' in the folder `lib2` also which was 
copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
`lib2` to `lib1`, re-submit the application. Finally I would get an 
ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32308) RestClusterClient submit job to remote cluster

2023-06-12 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32308:
--
Affects Version/s: 1.14.5

> RestClusterClient submit job to remote cluster
> --
>
> Key: FLINK-32308
> URL: https://issues.apache.org/jira/browse/FLINK-32308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Not a Priority
>
> I just used `RestClusterClient` submit job to remote cluster, but out of my 
> expectation it submitted to local cluster instead. Could you help with me
> {code:java}
> String host = "x.x.x.x";
> int port = 8081;
> Configuration flinkConfiguration = new Configuration();
> flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
> flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123);
> flinkConfiguration.setInteger(RestOptions.PORT, port);
> RestClusterClient clusterClient = new 
> RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance());
> String s = clusterClient.getWebInterfaceURL();
> List extraJars = new ArrayList<>();
> extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL());
> PackagedProgram packagedProgram = PackagedProgram.newBuilder()
> .setConfiguration(flinkConfiguration)
> .setJarFile(new File("F:\\data.jar"))
> .setEntryPointClassName("MyApplication")
> .setUserClassPaths(extraJars)
> .build();
> JobID jobID = JobID.generate();
> JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, 
> flinkConfiguration, 2, jobID, false);
> clusterClient.submitJob(jobGraph);
> System.out.println(jobID); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32308) RestClusterClient submit job to remote cluster

2023-06-12 Thread Spongebob (Jira)
Spongebob created FLINK-32308:
-

 Summary: RestClusterClient submit job to remote cluster
 Key: FLINK-32308
 URL: https://issues.apache.org/jira/browse/FLINK-32308
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Reporter: Spongebob


I just used `RestClusterClient` submit job to remote cluster, but out of my 
expectation it submitted to local cluster instead. Could you help with me
{code:java}
String host = "x.x.x.x";
int port = 8081;
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123);
flinkConfiguration.setInteger(RestOptions.PORT, port);

RestClusterClient clusterClient = new 
RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance());
String s = clusterClient.getWebInterfaceURL();
List extraJars = new ArrayList<>();
extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL());
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setConfiguration(flinkConfiguration)
.setJarFile(new File("F:\\data.jar"))
.setEntryPointClassName("MyApplication")
.setUserClassPaths(extraJars)
.build();
JobID jobID = JobID.generate();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, 
flinkConfiguration, 2, jobID, false);
clusterClient.submitJob(jobGraph);
System.out.println(jobID); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32274) kafka class could not be found while using appliction mode.

2023-06-07 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-32274.
-
Release Note: try to replace 
`org.apache.kafka.common.serialization.StringSerializer` with 
`Class.forName("org.apache.kafka.common.serialization.StringSerializer")`
  Resolution: Invalid

> kafka class could not be found while using appliction mode.
> ---
>
> Key: FLINK-32274
> URL: https://issues.apache.org/jira/browse/FLINK-32274
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
>
> While using yarn-application mode to submit my app, it would throw an 
> exception: `org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.StringDeserializer for configuration 
> key.deserializer: Class 
> org.apache.kafka.common.serialization.StringDeserializer could not be found.` 
> I had defined KafkaProducer variation in my client code. And I think the 
> kafka-clients dependency was already uploaded while submitting the app.
> But I found it is ok in per-job mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32274) kafka class could not be found while using appliction mode.

2023-06-07 Thread Spongebob (Jira)
Spongebob created FLINK-32274:
-

 Summary: kafka class could not be found while using appliction 
mode.
 Key: FLINK-32274
 URL: https://issues.apache.org/jira/browse/FLINK-32274
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.14.5
Reporter: Spongebob


While using yarn-application mode to submit my app, it would throw an 
exception: `org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.kafka.common.serialization.StringDeserializer for configuration 
key.deserializer: Class 
org.apache.kafka.common.serialization.StringDeserializer could not be found.` I 
had defined KafkaProducer variation in my client code. And I think the 
kafka-clients dependency was already uploaded while submitting the app.

But I found it is ok in per-job mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722121#comment-17722121
 ] 

Spongebob edited comment on FLINK-32065 at 5/12/23 11:15 AM:
-

Hi [~Thesharing] Does it likelihood due to the incorrect starting of the 
standalone cluster? Such as secondary starting the cluster before it is stopped.


was (Author: spongebobz):
Hi [~Thesharing] Does it likelihood due to the incorrect starting of the 
standalone cluster? Such as secondary starting the cluster before stopping it.

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722121#comment-17722121
 ] 

Spongebob commented on FLINK-32065:
---

Hi [~Thesharing] Does it likelihood due to the incorrect starting of the 
standalone cluster? Such as secondary starting the cluster before stopping it.

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722094#comment-17722094
 ] 

Spongebob commented on FLINK-32065:
---

Hi, [~Thesharing] , may be I could provide these logs for you.

!image-2023-05-12-17-37-09-002.png!

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32065:
--
Attachment: image-2023-05-12-17-37-09-002.png

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32065:
--
Description: 
When I submit an application to flink standalone cluster, I got a 
NoSuchFileException. I think it was failed to create the tmp channel file but I 
am confused about the reason relative to this case.

I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
is this diretory only working for that step of the application ?

BTW, this issue happen coincidently.

!image-2023-05-12-14-07-45-771.png!

  was:
When I submit an application to flink standalone cluster, I got a 
NoSuchFileException. I think it was failed to create the tmp channel file but I 
am confused about the reason relative to this case. BTW, this issue happen 
coincidently.

!image-2023-05-12-14-07-45-771.png!


> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32065:
--
Attachment: image-2023-05-12-14-26-46-268.png

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case. BTW, this issue happen 
> coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)
Spongebob created FLINK-32065:
-

 Summary: Got NoSuchFileException when initialize source function.
 Key: FLINK-32065
 URL: https://issues.apache.org/jira/browse/FLINK-32065
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.4
Reporter: Spongebob
 Attachments: image-2023-05-12-14-07-45-771.png

When I submit an application to flink standalone cluster, I got a 
NoSuchFileException. I think it was failed to create the tmp channel file but I 
am confused about the reason relative to this case. BTW, this issue happen 
coincidently.

!image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30500) Got an null pointer exception when using table environment in sub-thread

2022-12-24 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-30500:
--
Description: 
I can use the `addInsertSql` function nornally in norm situation, but when I 
create the table env in main thread and then run this function in sub-thread, I 
got an ambigious exception that was thrown by calcite.

 
{code:java}
public static void main(String[] args) throws InterruptedException {
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
String ddl = "CREATE TABLE IF NOT EXISTS SOURCE(" +
"A DECIMAL(19)," +
"B DECIMAL(19)" +
") WITH (" +
"'connector' = 'jdbc'...)";
String ddl2 = "CREATE TABLE IF NOT EXISTS SINK(" +
"A DECIMAL(19)," +
"B DECIMAL(19)" +
") WITH (" +
"'connector' = 'print')";
tableEnvironment.executeSql(ddl);
tableEnvironment.executeSql(ddl2);
StatementSet statementSet = tableEnvironment.createStatementSet();
CompletableFuture.runAsync(() -> {
statementSet.addInsertSql("INSERT INTO SINK SELECT * FROM SOURCE");
try {
statementSet.execute().await();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).exceptionally((e) -> {
e.printStackTrace();
return null;
});

Thread.sleep(1);
} {code}
 
{code:java}
Caused by: java.lang.NullPointerException
    at java.util.Objects.requireNonNull(Objects.java:203)
    at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
    at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
    at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:78)
    at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
    at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
    at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
    at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
    at 
org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
    at 
org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 

[jira] [Created] (FLINK-30500) Got an null pointer exception when using table environment in sub-thread

2022-12-24 Thread Spongebob (Jira)
Spongebob created FLINK-30500:
-

 Summary: Got an null pointer exception when using table 
environment in sub-thread
 Key: FLINK-30500
 URL: https://issues.apache.org/jira/browse/FLINK-30500
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.3
Reporter: Spongebob


I can use the `addInsertSql` function nornally in norm situation, but when I 
create the table env in main thread and then run this function in sub-thread, I 
got an ambigious exception that was thrown by calcite.

 
{code:java}
Caused by: java.lang.NullPointerException
    at java.util.Objects.requireNonNull(Objects.java:203)
    at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:144)
    at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:108)
    at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:78)
    at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
    at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
    at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
    at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
    at 
org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
    at 
org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
    at 

[jira] [Commented] (FLINK-29428) got an ambigious exception in flinksql

2022-09-28 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17610368#comment-17610368
 ] 

Spongebob commented on FLINK-29428:
---

when I disable the multiple input operator, it turned to be normal

> got an ambigious exception in flinksql
> --
>
> Key: FLINK-29428
> URL: https://issues.apache.org/jira/browse/FLINK-29428
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> when I execute my lengthy flinksql that contains 5 joins, I got this 
> exception:
> I have no any idea to solve this issue and asking for your help.
> {code:java}
> Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of 
> recursions, without reducing partitions enough to be memory resident. 
> Probably cause: Too many duplicate keys.
>         at 
> org.apache.flink.table.runtime.hashtable.BinaryHashTable.buildTableFromSpilledPartition(BinaryHashTable.java:443)
>         at 
> org.apache.flink.table.runtime.hashtable.BinaryHashTable.prepareNextPartition(BinaryHashTable.java:403)
>         at 
> org.apache.flink.table.runtime.hashtable.BinaryHashTable.nextMatching(BinaryHashTable.java:265)
>         at 
> org.apache.flink.table.runtime.operators.join.HashJoinOperator.endInput(HashJoinOperator.java:176)
>         at 
> org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.endOperatorInput(TableOperatorWrapper.java:124)
>         at 
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.endInput(BatchMultipleInputStreamOperator.java:56)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:93)
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:100)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
>         at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>         at java.lang.Thread.run(Thread.java:748) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29428) got an ambigious exception in flinksql

2022-09-27 Thread Spongebob (Jira)
Spongebob created FLINK-29428:
-

 Summary: got an ambigious exception in flinksql
 Key: FLINK-29428
 URL: https://issues.apache.org/jira/browse/FLINK-29428
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.3
Reporter: Spongebob


when I execute my lengthy flinksql that contains 5 joins, I got this exception:

I have no any idea to solve this issue and asking for your help.
{code:java}
Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of 
recursions, without reducing partitions enough to be memory resident. Probably 
cause: Too many duplicate keys.
        at 
org.apache.flink.table.runtime.hashtable.BinaryHashTable.buildTableFromSpilledPartition(BinaryHashTable.java:443)
        at 
org.apache.flink.table.runtime.hashtable.BinaryHashTable.prepareNextPartition(BinaryHashTable.java:403)
        at 
org.apache.flink.table.runtime.hashtable.BinaryHashTable.nextMatching(BinaryHashTable.java:265)
        at 
org.apache.flink.table.runtime.operators.join.HashJoinOperator.endInput(HashJoinOperator.java:176)
        at 
org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.endOperatorInput(TableOperatorWrapper.java:124)
        at 
org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.endInput(BatchMultipleInputStreamOperator.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:100)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.lang.Thread.run(Thread.java:748) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert

2022-09-16 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob resolved FLINK-29216.
---
Resolution: Not A Problem

> rows belong to update rowkind are transfered into delete and insert
> ---
>
> Key: FLINK-29216
> URL: https://issues.apache.org/jira/browse/FLINK-29216
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> When I declared primary key on the sink table, all UPDATE rows were 
> transfered into DELETE and INSERT rowkind, despite the primay key values were 
> not changed at all. If this is the expected logic then how could I change 
> this behavior ?
>  
> following is the test code:
> {code:java}
> CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
> ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
> CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
> KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
> insert into SINK select * from SOURCE 
> when I produced these message to kafka:
> {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
> {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
> flink output these rows:
> +I[BOB, MATH, 10]
> -D[BOB, MATH, 10]
> +I[BOB, MATH, 16]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert

2022-09-07 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601167#comment-17601167
 ] 

Spongebob commented on FLINK-29216:
---

Thanks [~lincoln.86xy] , it apparents normal when I set this option to NONE, 
since this option is used to solve the disorder in distribute system. But if I 
run my application in single parallelism would it be ok ?

> rows belong to update rowkind are transfered into delete and insert
> ---
>
> Key: FLINK-29216
> URL: https://issues.apache.org/jira/browse/FLINK-29216
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> When I declared primary key on the sink table, all UPDATE rows were 
> transfered into DELETE and INSERT rowkind, despite the primay key values were 
> not changed at all. If this is the expected logic then how could I change 
> this behavior ?
>  
> following is the test code:
> {code:java}
> CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
> ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
> CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
> KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
> insert into SINK select * from SOURCE 
> when I produced these message to kafka:
> {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
> {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
> flink output these rows:
> +I[BOB, MATH, 10]
> -D[BOB, MATH, 10]
> +I[BOB, MATH, 16]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert

2022-09-06 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-29216:
--
Description: 
When I declared primary key on the sink table, all UPDATE rows were transfered 
into DELETE and INSERT rowkind, despite the primay key values were not changed 
at all. If this is the expected logic then how could I change this behavior ?

 

following is the test code:
{code:java}
CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')

CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')

insert into SINK select * from SOURCE 



when I produced these message to kafka:
{"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
{"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}


flink output these rows:
+I[BOB, MATH, 10]
-D[BOB, MATH, 10]
+I[BOB, MATH, 16]{code}

  was:When I declared primary key on the sink table, all UPDATE rows were 
transfered into DELETE and INSERT rowkind, despite the primay key values were 
not changed at all. If this is the expected logic then how could I change this 
behavior ?


> rows belong to update rowkind are transfered into delete and insert
> ---
>
> Key: FLINK-29216
> URL: https://issues.apache.org/jira/browse/FLINK-29216
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> When I declared primary key on the sink table, all UPDATE rows were 
> transfered into DELETE and INSERT rowkind, despite the primay key values were 
> not changed at all. If this is the expected logic then how could I change 
> this behavior ?
>  
> following is the test code:
> {code:java}
> CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
> ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
> CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
> KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
> insert into SINK select * from SOURCE 
> when I produced these message to kafka:
> {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
> {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
> flink output these rows:
> +I[BOB, MATH, 10]
> -D[BOB, MATH, 10]
> +I[BOB, MATH, 16]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29216) rows belong to update rowkind are transfered into delete and insert

2022-09-06 Thread Spongebob (Jira)
Spongebob created FLINK-29216:
-

 Summary: rows belong to update rowkind are transfered into delete 
and insert
 Key: FLINK-29216
 URL: https://issues.apache.org/jira/browse/FLINK-29216
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.3
Reporter: Spongebob


When I declared primary key on the sink table, all UPDATE rows were transfered 
into DELETE and INSERT rowkind, despite the primay key values were not changed 
at all. If this is the expected logic then how could I change this behavior ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27969) StreamPhysicalOverAggregate doesn't support consuming update and delete changes

2022-06-09 Thread Spongebob (Jira)
Spongebob created FLINK-27969:
-

 Summary: StreamPhysicalOverAggregate doesn't support consuming 
update and delete changes
 Key: FLINK-27969
 URL: https://issues.apache.org/jira/browse/FLINK-27969
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.3
Reporter: Spongebob


Exception trace:
{code:java}
// exception
StreamPhysicalOverAggregate doesn't support consuming update and delete changes 
which is produced by node Join(joinType=[LeftOuterJoin], where=[(COL2 = COL4)], 
select=[...], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) {code}
FlinkSQL that scheduled as streaming table like this:
{code:java}
// dml
SELECT RANK() OVER (PARTITION BY A.COL1 ORDER BY A.COL2) AS ODER_ONUM
FROM A
INNER JOIN B ON A.COL1 = B.COL1
LEFT JOIN C ON C.COL3 = 1 AND CAST(A.COL2 AS STRING) = C.COL4{code}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Reopened] (FLINK-27638) failed to join with table function

2022-05-23 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob reopened FLINK-27638:
---

> failed to join with table function
> --
>
> Key: FLINK-27638
> URL: https://issues.apache.org/jira/browse/FLINK-27638
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-16-19-40-23-179.png
>
>
> # register two table function named `FUNC_A` and `FUNC_B`
>  # left join with FUNC_A
>  # inner join with FUNC_B
>  # schedule the dml
> after these steps, I found the task of FUNC_A keeped running but the task of 
> FUNC_B turned to be finished in serveral seconds. And I am not sure that the 
> unnormal task lead to empty output of the dml.
> !image-2022-05-16-19-40-23-179.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27638) failed to join with table function

2022-05-23 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540880#comment-17540880
 ] 

Spongebob commented on FLINK-27638:
---

[~fsk119] I had not set any finish condition in the table function. By the way 
it run normally when I don't join with any other table function in the graph.

> failed to join with table function
> --
>
> Key: FLINK-27638
> URL: https://issues.apache.org/jira/browse/FLINK-27638
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-16-19-40-23-179.png
>
>
> # register two table function named `FUNC_A` and `FUNC_B`
>  # left join with FUNC_A
>  # inner join with FUNC_B
>  # schedule the dml
> after these steps, I found the task of FUNC_A keeped running but the task of 
> FUNC_B turned to be finished in serveral seconds. And I am not sure that the 
> unnormal task lead to empty output of the dml.
> !image-2022-05-16-19-40-23-179.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27638) failed to join with table function

2022-05-16 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27638:
--
Attachment: (was: image-2022-05-16-19-08-25-477.png)

> failed to join with table function
> --
>
> Key: FLINK-27638
> URL: https://issues.apache.org/jira/browse/FLINK-27638
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-16-19-40-23-179.png
>
>
> # register two table function named `FUNC_A` and `FUNC_B`
>  # left join with FUNC_A
>  # inner join with FUNC_B
>  # schedule the dml
> after these steps, I found the task of FUNC_A keeped running but the task of 
> FUNC_B turned to be finished in serveral seconds. And I am not sure that the 
> unnormal task lead to empty output of the dml.
> !image-2022-05-16-19-40-23-179.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27638) failed to join with table function

2022-05-16 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27638:
--
 Attachment: image-2022-05-16-19-40-23-179.png
Description: 
# register two table function named `FUNC_A` and `FUNC_B`
 # left join with FUNC_A
 # inner join with FUNC_B
 # schedule the dml

after these steps, I found the task of FUNC_A keeped running but the task of 
FUNC_B turned to be finished in serveral seconds. And I am not sure that the 
unnormal task lead to empty output of the dml.

!image-2022-05-16-19-40-23-179.png!

  was:
# regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER`
 # create two complexible DML that both inner join with the table functioin and 
they are all streaming tasks.
 # schedule the two DML in one submission based on statementSet.

atfer these steps I found that the table function was run on one exclusive task 
and it turned to be finished in serveral seconds. And the two DML had not any 
output after the inner join with that table function.

Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this 
situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected 
graph when using table function.

!image-2022-05-16-19-08-25-477.png!

 

!image-2022-05-16-19-08-50-286.png!


> failed to join with table function
> --
>
> Key: FLINK-27638
> URL: https://issues.apache.org/jira/browse/FLINK-27638
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-16-19-40-23-179.png
>
>
> # register two table function named `FUNC_A` and `FUNC_B`
>  # left join with FUNC_A
>  # inner join with FUNC_B
>  # schedule the dml
> after these steps, I found the task of FUNC_A keeped running but the task of 
> FUNC_B turned to be finished in serveral seconds. And I am not sure that the 
> unnormal task lead to empty output of the dml.
> !image-2022-05-16-19-40-23-179.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27638) failed to join with table function

2022-05-16 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27638:
--
Attachment: (was: image-2022-05-16-19-08-50-286.png)

> failed to join with table function
> --
>
> Key: FLINK-27638
> URL: https://issues.apache.org/jira/browse/FLINK-27638
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-16-19-40-23-179.png
>
>
> # register two table function named `FUNC_A` and `FUNC_B`
>  # left join with FUNC_A
>  # inner join with FUNC_B
>  # schedule the dml
> after these steps, I found the task of FUNC_A keeped running but the task of 
> FUNC_B turned to be finished in serveral seconds. And I am not sure that the 
> unnormal task lead to empty output of the dml.
> !image-2022-05-16-19-40-23-179.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27638) failed to join with table function

2022-05-16 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27638:
--
Description: 
# regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER`
 # create two complexible DML that both inner join with the table functioin and 
they are all streaming tasks.
 # schedule the two DML in one submission based on statementSet.

atfer these steps I found that the table function was run on one exclusive task 
and it turned to be finished in serveral seconds. And the two DML had not any 
output after the inner join with that table function.

Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this 
situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected 
graph when using table function.

!image-2022-05-16-19-08-25-477.png!

 

!image-2022-05-16-19-08-50-286.png!

  was:
# regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER`
 # create two flinksql complexible DML that both inner join with the table 
functioin.
 # schedule the two DML in one submission based on statementSet.

atfer these steps I found that the table function was run on one exclusive task 
and it turned to be finished in serveral seconds. And the two DML had not any 
output after the inner join with that table function.

Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this 
situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected 
graph when using table function.

!image-2022-05-16-19-08-25-477.png!

 

!image-2022-05-16-19-08-50-286.png!


> failed to join with table function
> --
>
> Key: FLINK-27638
> URL: https://issues.apache.org/jira/browse/FLINK-27638
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-16-19-08-25-477.png, 
> image-2022-05-16-19-08-50-286.png
>
>
> # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER`
>  # create two complexible DML that both inner join with the table functioin 
> and they are all streaming tasks.
>  # schedule the two DML in one submission based on statementSet.
> atfer these steps I found that the table function was run on one exclusive 
> task and it turned to be finished in serveral seconds. And the two DML had 
> not any output after the inner join with that table function.
> Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this 
> situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the 
> expected graph when using table function.
> !image-2022-05-16-19-08-25-477.png!
>  
> !image-2022-05-16-19-08-50-286.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27638) failed to join with table function

2022-05-16 Thread Spongebob (Jira)
Spongebob created FLINK-27638:
-

 Summary: failed to join with table function
 Key: FLINK-27638
 URL: https://issues.apache.org/jira/browse/FLINK-27638
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.3
Reporter: Spongebob
 Attachments: image-2022-05-16-19-08-25-477.png, 
image-2022-05-16-19-08-50-286.png

# regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER`
 # create two flinksql complexible DML that both inner join with the table 
functioin.
 # schedule the two DML in one submission based on statementSet.

atfer these steps I found that the table function was run on one exclusive task 
and it turned to be finished in serveral seconds. And the two DML had not any 
output after the inner join with that table function.

Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this 
situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected 
graph when using table function.

!image-2022-05-16-19-08-25-477.png!

 

!image-2022-05-16-19-08-50-286.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27462) could not trigger checkpoint when using table function

2022-05-01 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-27462.
-
Release Note: 
Flink will stop checkpointing when some subtasks turn to be finished. And we 
can switch off this trait by `

config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);`
  Resolution: Not A Problem

> could not trigger checkpoint when using table function
> --
>
> Key: FLINK-27462
> URL: https://issues.apache.org/jira/browse/FLINK-27462
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-01-11-22-03-127.png, 
> image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png
>
>
> I found that it could not trigger checkpoints despite I had enable it in 
> environment. 
> I think this problem may be related to I had used a table function in my DML. 
> When I deploy the application the task dedicated to the table function turned 
> to be finished immediately despite the table function had declarate the 
> property `isDeterministic` to false.
> Below is my basic code to recur the issue:
>  
> {code:java}
> // table function
> public class GetStreamingModelSinkFilter extends TableFunction {
> private boolean status = false;
> private String statusRedisValue;
> private final String redisKeyOfStatus;
> private final RedisInfo redisInfo;
> private RedisManager redisManager;
> private RedisCommands redisCommands;
> private long lastCheckTimestamp = 0L;
> private long currentTimestamp;
> public GetStreamingModelSinkFilter() {
> ...initial something
> }
> }
> @Override
> public void open(FunctionContext context) throws Exception {
> redisManager = new RedisManager(redisInfo);
> redisCommands = redisManager.getCommands();
> }
> @Override
> public boolean isDeterministic() {
> return false;
> }
> public void eval() {
> if (status) {
> collect(1);
> } else {
> currentTimestamp = System.currentTimeMillis();
> if (currentTimestamp - lastCheckTimestamp < 1000) {
> collect(0);
> } else {
> statusRedisValue = redisCommands.get(redisKeyOfStatus);
> if (Objects.equals(statusRedisValue, "1")) {
> status = true;
> collect(1);
> } else {
> lastCheckTimestamp = currentTimestamp;
> collect(0);
> }
> }
> }
> }
> @Override
> public void close() throws Exception {
> redisManager.close();
> }
> } {code}
> Below's the DML:
>  
>  
> {code:java}
> INSERT INTO TEST SELECT  T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
> T(MARK) ON TRUE WHERE MARK = 1
> {code}
> TASKS SNAPSHOT OF THE APPLICATION
> !image-2022-05-01-11-22-03-127.png!
> !image-2022-05-01-11-23-07-063.png!
> !image-2022-05-01-11-23-24-938.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27462) could not trigger checkpoint when using table function

2022-04-30 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27462:
--
Description: 
I found that it could not trigger checkpoints despite I had enable it in 
environment. 

I think this problem may be related to I had used a table function in my DML. 
When I deploy the application the task dedicated to the table function turned 
to be finished immediately despite the table function had declarate the 
property `isDeterministic` to false.

Below is my basic code to recur the issue:

 
{code:java}
// table function
public class GetStreamingModelSinkFilter extends TableFunction {
private boolean status = false;
private String statusRedisValue;
private final String redisKeyOfStatus;
private final RedisInfo redisInfo;
private RedisManager redisManager;
private RedisCommands redisCommands;
private long lastCheckTimestamp = 0L;
private long currentTimestamp;

public GetStreamingModelSinkFilter() {
...initial something
}
}

@Override
public void open(FunctionContext context) throws Exception {
redisManager = new RedisManager(redisInfo);
redisCommands = redisManager.getCommands();
}

@Override
public boolean isDeterministic() {
return false;
}

public void eval() {
if (status) {
collect(1);
} else {
currentTimestamp = System.currentTimeMillis();
if (currentTimestamp - lastCheckTimestamp < 1000) {
collect(0);
} else {
statusRedisValue = redisCommands.get(redisKeyOfStatus);
if (Objects.equals(statusRedisValue, "1")) {
status = true;
collect(1);
} else {
lastCheckTimestamp = currentTimestamp;
collect(0);
}
}

}
}

@Override
public void close() throws Exception {
redisManager.close();
}
} {code}
Below's the DML:

 

 
{code:java}
INSERT INTO TEST SELECT  T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
T(MARK) ON TRUE WHERE MARK = 1
{code}
TASKS SNAPSHOT OF THE APPLICATION

!image-2022-05-01-11-22-03-127.png!

!image-2022-05-01-11-23-07-063.png!

!image-2022-05-01-11-23-24-938.png!

 

  was:
I found that it could not trigger checkpoints despite I had enable it in 
environment. 

I think this problem may be related to I had used a table function in my DML. 
When I deploy the application the task dedicated to the table function turned 
to be finished immediately despite the table function had declarate the 
property `isDeterministic` to false.

Below is my basic code to recur the issue:

 
{code:java}
// table function
public class GetStreamingModelSinkFilter extends TableFunction {
private boolean status = false;
private String statusRedisValue;
private final String redisKeyOfStatus;
private final RedisInfo redisInfo;
private RedisManager redisManager;
private RedisCommands redisCommands;
private long lastCheckTimestamp = 0L;
private long currentTimestamp;

public GetStreamingModelSinkFilter() {
...initial something
}
}

@Override
public void open(FunctionContext context) throws Exception {
redisManager = new RedisManager(redisInfo);
redisCommands = redisManager.getCommands();
}

@Override
public boolean isDeterministic() {
return false;
}

public void eval() {
if (status) {
collect(1);
} else {
currentTimestamp = System.currentTimeMillis();
if (currentTimestamp - lastCheckTimestamp < 1000) {
collect(0);
} else {
statusRedisValue = redisCommands.get(redisKeyOfStatus);
if (Objects.equals(statusRedisValue, "1")) {
status = true;
collect(1);
} else {
lastCheckTimestamp = currentTimestamp;
collect(0);
}
}

}
}

@Override
public void close() throws Exception {
redisManager.close();
}
} {code}
Below's the DML:

 

 
{code:java}
INSERT INTO TEST SELECT  T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
T(MARK) ON TRUE WHERE MARK = 1
{code}
TASKS SNAPSHOT OF THE APPLICATION

!image-2022-05-01-11-22-03-127.png!

 


> could not trigger checkpoint when using table function
> --
>
> Key: FLINK-27462
> URL: https://issues.apache.org/jira/browse/FLINK-27462
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: 

[jira] [Updated] (FLINK-27462) could not trigger checkpoint when using table function

2022-04-30 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27462:
--
Attachment: image-2022-05-01-11-23-24-938.png

> could not trigger checkpoint when using table function
> --
>
> Key: FLINK-27462
> URL: https://issues.apache.org/jira/browse/FLINK-27462
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-01-11-22-03-127.png, 
> image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png
>
>
> I found that it could not trigger checkpoints despite I had enable it in 
> environment. 
> I think this problem may be related to I had used a table function in my DML. 
> When I deploy the application the task dedicated to the table function turned 
> to be finished immediately despite the table function had declarate the 
> property `isDeterministic` to false.
> Below is my basic code to recur the issue:
>  
> {code:java}
> // table function
> public class GetStreamingModelSinkFilter extends TableFunction {
> private boolean status = false;
> private String statusRedisValue;
> private final String redisKeyOfStatus;
> private final RedisInfo redisInfo;
> private RedisManager redisManager;
> private RedisCommands redisCommands;
> private long lastCheckTimestamp = 0L;
> private long currentTimestamp;
> public GetStreamingModelSinkFilter() {
> ...initial something
> }
> }
> @Override
> public void open(FunctionContext context) throws Exception {
> redisManager = new RedisManager(redisInfo);
> redisCommands = redisManager.getCommands();
> }
> @Override
> public boolean isDeterministic() {
> return false;
> }
> public void eval() {
> if (status) {
> collect(1);
> } else {
> currentTimestamp = System.currentTimeMillis();
> if (currentTimestamp - lastCheckTimestamp < 1000) {
> collect(0);
> } else {
> statusRedisValue = redisCommands.get(redisKeyOfStatus);
> if (Objects.equals(statusRedisValue, "1")) {
> status = true;
> collect(1);
> } else {
> lastCheckTimestamp = currentTimestamp;
> collect(0);
> }
> }
> }
> }
> @Override
> public void close() throws Exception {
> redisManager.close();
> }
> } {code}
> Below's the DML:
>  
>  
> {code:java}
> INSERT INTO TEST SELECT  T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
> T(MARK) ON TRUE WHERE MARK = 1
> {code}
> TASKS SNAPSHOT OF THE APPLICATION
> !image-2022-05-01-11-22-03-127.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27462) could not trigger checkpoint when using table function

2022-04-30 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27462:
--
Attachment: image-2022-05-01-11-23-07-063.png

> could not trigger checkpoint when using table function
> --
>
> Key: FLINK-27462
> URL: https://issues.apache.org/jira/browse/FLINK-27462
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2022-05-01-11-22-03-127.png, 
> image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png
>
>
> I found that it could not trigger checkpoints despite I had enable it in 
> environment. 
> I think this problem may be related to I had used a table function in my DML. 
> When I deploy the application the task dedicated to the table function turned 
> to be finished immediately despite the table function had declarate the 
> property `isDeterministic` to false.
> Below is my basic code to recur the issue:
>  
> {code:java}
> // table function
> public class GetStreamingModelSinkFilter extends TableFunction {
> private boolean status = false;
> private String statusRedisValue;
> private final String redisKeyOfStatus;
> private final RedisInfo redisInfo;
> private RedisManager redisManager;
> private RedisCommands redisCommands;
> private long lastCheckTimestamp = 0L;
> private long currentTimestamp;
> public GetStreamingModelSinkFilter() {
> ...initial something
> }
> }
> @Override
> public void open(FunctionContext context) throws Exception {
> redisManager = new RedisManager(redisInfo);
> redisCommands = redisManager.getCommands();
> }
> @Override
> public boolean isDeterministic() {
> return false;
> }
> public void eval() {
> if (status) {
> collect(1);
> } else {
> currentTimestamp = System.currentTimeMillis();
> if (currentTimestamp - lastCheckTimestamp < 1000) {
> collect(0);
> } else {
> statusRedisValue = redisCommands.get(redisKeyOfStatus);
> if (Objects.equals(statusRedisValue, "1")) {
> status = true;
> collect(1);
> } else {
> lastCheckTimestamp = currentTimestamp;
> collect(0);
> }
> }
> }
> }
> @Override
> public void close() throws Exception {
> redisManager.close();
> }
> } {code}
> Below's the DML:
>  
>  
> {code:java}
> INSERT INTO TEST SELECT  T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
> T(MARK) ON TRUE WHERE MARK = 1
> {code}
> TASKS SNAPSHOT OF THE APPLICATION
> !image-2022-05-01-11-22-03-127.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27462) could not trigger checkpoint when using table function

2022-04-30 Thread Spongebob (Jira)
Spongebob created FLINK-27462:
-

 Summary: could not trigger checkpoint when using table function
 Key: FLINK-27462
 URL: https://issues.apache.org/jira/browse/FLINK-27462
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.3
Reporter: Spongebob
 Attachments: image-2022-05-01-11-22-03-127.png

I found that it could not trigger checkpoints despite I had enable it in 
environment. 

I think this problem may be related to I had used a table function in my DML. 
When I deploy the application the task dedicated to the table function turned 
to be finished immediately despite the table function had declarate the 
property `isDeterministic` to false.

Below is my basic code to recur the issue:

 
{code:java}
// table function
public class GetStreamingModelSinkFilter extends TableFunction {
private boolean status = false;
private String statusRedisValue;
private final String redisKeyOfStatus;
private final RedisInfo redisInfo;
private RedisManager redisManager;
private RedisCommands redisCommands;
private long lastCheckTimestamp = 0L;
private long currentTimestamp;

public GetStreamingModelSinkFilter() {
...initial something
}
}

@Override
public void open(FunctionContext context) throws Exception {
redisManager = new RedisManager(redisInfo);
redisCommands = redisManager.getCommands();
}

@Override
public boolean isDeterministic() {
return false;
}

public void eval() {
if (status) {
collect(1);
} else {
currentTimestamp = System.currentTimeMillis();
if (currentTimestamp - lastCheckTimestamp < 1000) {
collect(0);
} else {
statusRedisValue = redisCommands.get(redisKeyOfStatus);
if (Objects.equals(statusRedisValue, "1")) {
status = true;
collect(1);
} else {
lastCheckTimestamp = currentTimestamp;
collect(0);
}
}

}
}

@Override
public void close() throws Exception {
redisManager.close();
}
} {code}
Below's the DML:

 

 
{code:java}
INSERT INTO TEST SELECT  T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
T(MARK) ON TRUE WHERE MARK = 1
{code}
TASKS SNAPSHOT OF THE APPLICATION

!image-2022-05-01-11-22-03-127.png!

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql

2022-04-30 Thread Spongebob (Jira)


[ https://issues.apache.org/jira/browse/FLINK-27436 ]


Spongebob deleted comment on FLINK-27436:
---

was (Author: spongebobz):
Hi [~martijnvisser] , When I deploy my flinksql application that contains 
multiple insertion sql that are both executed by one statementSet to standalone 
cluster,

It did not tigger checkpoints despite some source table already got data input.

> option `properties.group.id` is not effective in kafka connector for finksql
> 
>
> Key: FLINK-27436
> URL: https://issues.apache.org/jira/browse/FLINK-27436
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> option `properties.group.id` is not effective in kafka connector for finksql.
> when I run this sql, I can read message from specific topic normaly. But I 
> could not
> find the group named `test-group` in kafka server. 
> {code:java}
> // ddl like this
> "CREATE TABLE ...
> "WITH ('connector' = 'kafka',
>  'properties.bootstrap.servers' = '...'," +
> "'topic' = '...',"+
>  "'scan.startup.mode'='latest-offset'," +
> "'properties.group.id' = 'test-group'," +
>  "'format' = 'debezium-json')"; {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql

2022-04-29 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530329#comment-17530329
 ] 

Spongebob commented on FLINK-27436:
---

Hi [~martijnvisser] , When I deploy my flinksql application that contains 
multiple insertion sql that are both executed by one statementSet to standalone 
cluster,

It did not tigger checkpoints despite some source table already got data input.

> option `properties.group.id` is not effective in kafka connector for finksql
> 
>
> Key: FLINK-27436
> URL: https://issues.apache.org/jira/browse/FLINK-27436
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> option `properties.group.id` is not effective in kafka connector for finksql.
> when I run this sql, I can read message from specific topic normaly. But I 
> could not
> find the group named `test-group` in kafka server. 
> {code:java}
> // ddl like this
> "CREATE TABLE ...
> "WITH ('connector' = 'kafka',
>  'properties.bootstrap.servers' = '...'," +
> "'topic' = '...',"+
>  "'scan.startup.mode'='latest-offset'," +
> "'properties.group.id' = 'test-group'," +
>  "'format' = 'debezium-json')"; {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql

2022-04-28 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529278#comment-17529278
 ] 

Spongebob commented on FLINK-27436:
---

[~martijnvisser] Great I can see it now follow your advise. But the snapshot 
would not be made until data input into the connector.

> option `properties.group.id` is not effective in kafka connector for finksql
> 
>
> Key: FLINK-27436
> URL: https://issues.apache.org/jira/browse/FLINK-27436
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> option `properties.group.id` is not effective in kafka connector for finksql.
> when I run this sql, I can read message from specific topic normaly. But I 
> could not
> find the group named `test-group` in kafka server. 
> {code:java}
> // ddl like this
> "CREATE TABLE ...
> "WITH ('connector' = 'kafka',
>  'properties.bootstrap.servers' = '...'," +
> "'topic' = '...',"+
>  "'scan.startup.mode'='latest-offset'," +
> "'properties.group.id' = 'test-group'," +
>  "'format' = 'debezium-json')"; {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql

2022-04-28 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529260#comment-17529260
 ] 

Spongebob commented on FLINK-27436:
---

[~martijnvisser] I had not. By the ways I could see this option was printed by 
both KafkaConsumer and AdminClient, but I don't know why it did not effect.

> option `properties.group.id` is not effective in kafka connector for finksql
> 
>
> Key: FLINK-27436
> URL: https://issues.apache.org/jira/browse/FLINK-27436
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> option `properties.group.id` is not effective in kafka connector for finksql.
> when I run this sql, I can read message from specific topic normaly. But I 
> could not
> find the group named `test-group` in kafka server. 
> {code:java}
> // ddl like this
> "CREATE TABLE ...
> "WITH ('connector' = 'kafka',
>  'properties.bootstrap.servers' = '...'," +
> "'topic' = '...',"+
>  "'scan.startup.mode'='latest-offset'," +
> "'properties.group.id' = 'test-group'," +
>  "'format' = 'debezium-json')"; {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27436) option `properties.group.id` is not effective in kafka connector for finksql

2022-04-28 Thread Spongebob (Jira)
Spongebob created FLINK-27436:
-

 Summary: option `properties.group.id` is not effective in kafka 
connector for finksql
 Key: FLINK-27436
 URL: https://issues.apache.org/jira/browse/FLINK-27436
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.14.3
Reporter: Spongebob


option `properties.group.id` is not effective in kafka connector for finksql.

when I run this sql, I can read message from specific topic normaly. But I 
could not

find the group named `test-group` in kafka server. 
{code:java}
// ddl like this
"CREATE TABLE ...
"WITH ('connector' = 'kafka',
 'properties.bootstrap.servers' = '...'," +
"'topic' = '...',"+
 "'scan.startup.mode'='latest-offset'," +
"'properties.group.id' = 'test-group'," +
 "'format' = 'debezium-json')"; {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-26925) loss scale in union situation

2022-03-30 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-26925.
-
Resolution: Not A Problem

> loss scale in union situation
> -
>
> Key: FLINK-26925
> URL: https://issues.apache.org/jira/browse/FLINK-26925
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Spongebob
>Priority: Major
>
> when I union two columns that datatypes are decimal(38,4) and decimal(38,2), 
> but got decimal(38,2) in return. This cause the problem that loss scale in 
> result set. I think the final datatype should be decimal(38,4) would be fine.
> {code:java}
> TableEnvironment tableEnvironment = 
> TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
> Table t1 = tableEnvironment.sqlQuery("select cast(1.23 as decimal(38,2)) as 
> a");
> Table t2 = tableEnvironment.sqlQuery("select cast(4.5678 as decimal(38,4)) as 
> a");
> tableEnvironment.createTemporaryView("t1", t1);
> tableEnvironment.createTemporaryView("t2", t2);
> tableEnvironment.executeSql("select a from t1 union all select a from 
> t2").print();
> tableEnvironment.sqlQuery("select a from t1 union all select a from 
> t2").printSchema(); 
> // output
> +--+
> |                                        a |
> +--+
> |                                     1.23 |
> |                                     4.57 |
> +--+
> 2 rows in set
> (
>   `a` DECIMAL(38, 2) NOT NULL
> ){code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26925) loss scale in union situation

2022-03-30 Thread Spongebob (Jira)
Spongebob created FLINK-26925:
-

 Summary: loss scale in union situation
 Key: FLINK-26925
 URL: https://issues.apache.org/jira/browse/FLINK-26925
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Spongebob


when I union two columns that datatypes are decimal(38,4) and decimal(38,2), 
but got decimal(38,2) in return. This cause the problem that loss scale in 
result set. I think the final datatype should be decimal(38,4) would be fine.
{code:java}
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());

Table t1 = tableEnvironment.sqlQuery("select cast(1.23 as decimal(38,2)) as a");
Table t2 = tableEnvironment.sqlQuery("select cast(4.5678 as decimal(38,4)) as 
a");

tableEnvironment.createTemporaryView("t1", t1);
tableEnvironment.createTemporaryView("t2", t2);

tableEnvironment.executeSql("select a from t1 union all select a from 
t2").print();
tableEnvironment.sqlQuery("select a from t1 union all select a from 
t2").printSchema(); 


// output
+--+
|                                        a |
+--+
|                                     1.23 |
|                                     4.57 |
+--+
2 rows in set
(
  `a` DECIMAL(38, 2) NOT NULL
){code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26461) Throw CannotPlanException in TableFunction

2022-03-06 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502100#comment-17502100
 ] 

Spongebob commented on FLINK-26461:
---

Hi  lincoln, I found this problem just occur in the version that preceding 
Flink 1.14.3 such as 1.14.2

> Throw CannotPlanException in TableFunction
> --
>
> Key: FLINK-26461
> URL: https://issues.apache.org/jira/browse/FLINK-26461
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> I got an CannotPlanException when change the isDeterministic option to false. 
> For detail see this code:
> {code:java}
> //代码占位符
> public class GetDayTimeEtlSwitch extends TableFunction {
> private boolean status = false;
> @Override
> public boolean isDeterministic() {
> return false;
> }
> public void eval() {
> if (status) {
> collect(1);
> } else {
> if (System.currentTimeMillis() > 1646298908000L) {
> status = true;
> collect(1);
> } else {
> collect(0);
> }
> }
> }
> } {code}
> Exception stack...
> {code:java}
> //代码占位符
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query: 
> FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
>  fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0])
> +- FlinkLogicalJoin(condition=[true], joinType=[left])
>    :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, 
> PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME])
>    :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
> default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE])
>    +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], 
> rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query 
> uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
>     at TestSwitch.main(TestSwitch.java:33)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
> are not enough rules to produce a node with desired properties: 
> convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
> MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], 
> UpdateKindTraitDef=[NONE].
> Missing conversion is FlinkLogicalTableFunctionScan[convention: 

[jira] [Commented] (FLINK-26461) Throw CannotPlanException in TableFunction

2022-03-03 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500647#comment-17500647
 ] 

Spongebob commented on FLINK-26461:
---

Hi Lincoln, this is my application code:
{code:java}
//代码占位符
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
tableEnvironment.createTemporaryFunction("GET_SWITCH", 
GetDayTimeEtlSwitch.class);

String dailyScoreCdcDDL = "CREATE TABLE DAILY_SCORE_CDC(" +
"STUNAME STRING," +
"SUBJECT STRING," +
"SCORE DECIMAL(10,0)," +
"PROC_TIME AS PROCTIME()" +
") WITH (" +
"'connector'='oracle-cdc'," +
"..."
")";

String dml = "SELECT * FROM DAILY_SCORE_CDC LEFT JOIN LATERAL 
TABLE(GET_SWITCH()) AS T(STATUS) ON TRUE";

tableEnvironment.executeSql(dailyScoreCdcDDL);
tableEnvironment.executeSql(dml).print(); {code}

> Throw CannotPlanException in TableFunction
> --
>
> Key: FLINK-26461
> URL: https://issues.apache.org/jira/browse/FLINK-26461
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> I got an CannotPlanException when change the isDeterministic option to false. 
> For detail see this code:
> {code:java}
> //代码占位符
> public class GetDayTimeEtlSwitch extends TableFunction {
> private boolean status = false;
> @Override
> public boolean isDeterministic() {
> return false;
> }
> public void eval() {
> if (status) {
> collect(1);
> } else {
> if (System.currentTimeMillis() > 1646298908000L) {
> status = true;
> collect(1);
> } else {
> collect(0);
> }
> }
> }
> } {code}
> Exception stack...
> {code:java}
> //代码占位符
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query: 
> FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
>  fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0])
> +- FlinkLogicalJoin(condition=[true], joinType=[left])
>    :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, 
> PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME])
>    :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
> default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE])
>    +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], 
> rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query 
> uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
>     at 

[jira] [Created] (FLINK-26461) Throw CannotPlanException in TableFunction

2022-03-03 Thread Spongebob (Jira)
Spongebob created FLINK-26461:
-

 Summary: Throw CannotPlanException in TableFunction
 Key: FLINK-26461
 URL: https://issues.apache.org/jira/browse/FLINK-26461
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.3
Reporter: Spongebob


I got an CannotPlanException when change the isDeterministic option to false. 
For detail see this code:
{code:java}
//代码占位符
public class GetDayTimeEtlSwitch extends TableFunction {
private boolean status = false;


@Override
public boolean isDeterministic() {
return false;
}

public void eval() {
if (status) {
collect(1);
} else {
if (System.currentTimeMillis() > 1646298908000L) {
status = true;
collect(1);
} else {
collect(0);
}

}

}
} {code}
Exception stack...
{code:java}
//代码占位符
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 
FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
 fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0])
+- FlinkLogicalJoin(condition=[true], joinType=[left])
   :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, 
PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME])
   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE])
   +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], 
rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query 
uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
    at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
    at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
    at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
    at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
    at TestSwitch.main(TestSwitch.java:33)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], 
UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> 
STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single]
There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 
0.[NONE].[NONE], the relevant part of the original plan is as follows
168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], 
rowType=[RecordType(INTEGER EXPR$0)])Root: 
rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
Original rel:
FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: 
0.[NONE].[NONE]], 

[jira] [Closed] (FLINK-26384) This exception indicates that the query uses an unsupported SQL feature.

2022-02-27 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-26384.
-
Resolution: Not A Problem

> This exception indicates that the query uses an unsupported SQL feature.
> 
>
> Key: FLINK-26384
> URL: https://issues.apache.org/jira/browse/FLINK-26384
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.3
>Reporter: Spongebob
>Priority: Major
>
> Got an unsupported exception when generating a valid execution plan.
> {code:java}
> //代码占位符
> TableEnvironment tableEnvironment = 
> TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
> // HERE'S DDL AND DML SQL
> CREATE TABLE IF NOT EXISTS TABLE_A (CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP 
> DECIMAL(20,4),TSCP DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),SCR_INCD 
> DECIMAL(19,0),AST_TYPE DECIMAL(3,0),SCR_TYPE DECIMAL(3,0)) WITH ('connector' 
> = 'jdbc'...)
> CREATE TABLE IF NOT EXISTS TABLE_B (AST_TYPE DECIMAL(4,0),CIR_CAPT 
> DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP 
> DECIMAL(20,4),SCR_INCD DECIMAL(19,0),SCR_TYPE DECIMAL(8,0),TSCP 
> DECIMAL(20,4),  PRIMARY KEY (SCR_INCD) NOT ENFORCED)  WITH ('connector' = 
> 'jdbc'...)
> INSERT INTO TABLE_B 
> SELECT T_SOURCE.* FROM (
> SELECT CASE WHEN B.AST_TYPE IS NULL THEN 1 ELSE B.AST_TYPE END AS AST_TYPE, 
> B.CIR_CAPT AS CIR_CAPT, B.FULL_MKT_CIR_CAPT AS FULL_MKT_CIR_CAPT, 
> B.FULL_MKT_TSCP AS FULL_MKT_TSCP, B.SCR_INCD AS SCR_INCD, CASE WHEN 
> B.SCR_TYPE IS NULL THEN 1 ELSE B.SCR_TYPE END AS SCR_TYPE, B.TSCP AS TSCP 
> FROM TABLE_A B
> ) T_SOURCE INNER JOIN ( SELECT DISTINCT SCR_INCD FROM TABLE_B) T_SINK ON 
> T_SOURCE.SCR_INCD = T_SINK.SCR_INCD    
> // EXCEPTION TRACK
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
>     at 
> 

[jira] [Created] (FLINK-26384) This exception indicates that the query uses an unsupported SQL feature.

2022-02-27 Thread Spongebob (Jira)
Spongebob created FLINK-26384:
-

 Summary: This exception indicates that the query uses an 
unsupported SQL feature.
 Key: FLINK-26384
 URL: https://issues.apache.org/jira/browse/FLINK-26384
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.3
Reporter: Spongebob


Got an unsupported exception when generating a valid execution plan.
{code:java}
//代码占位符
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());

// HERE'S DDL AND DML SQL
CREATE TABLE IF NOT EXISTS TABLE_A (CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP 
DECIMAL(20,4),TSCP DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),SCR_INCD 
DECIMAL(19,0),AST_TYPE DECIMAL(3,0),SCR_TYPE DECIMAL(3,0)) WITH ('connector' = 
'jdbc'...)

CREATE TABLE IF NOT EXISTS TABLE_B (AST_TYPE DECIMAL(4,0),CIR_CAPT 
DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP 
DECIMAL(20,4),SCR_INCD DECIMAL(19,0),SCR_TYPE DECIMAL(8,0),TSCP DECIMAL(20,4),  
PRIMARY KEY (SCR_INCD) NOT ENFORCED)  WITH ('connector' = 'jdbc'...)

INSERT INTO TABLE_B 
SELECT T_SOURCE.* FROM (
SELECT CASE WHEN B.AST_TYPE IS NULL THEN 1 ELSE B.AST_TYPE END AS AST_TYPE, 
B.CIR_CAPT AS CIR_CAPT, B.FULL_MKT_CIR_CAPT AS FULL_MKT_CIR_CAPT, 
B.FULL_MKT_TSCP AS FULL_MKT_TSCP, B.SCR_INCD AS SCR_INCD, CASE WHEN B.SCR_TYPE 
IS NULL THEN 1 ELSE B.SCR_TYPE END AS SCR_TYPE, B.TSCP AS TSCP FROM TABLE_A B
) T_SOURCE INNER JOIN ( SELECT DISTINCT SCR_INCD FROM TABLE_B) T_SINK ON 
T_SOURCE.SCR_INCD = T_SINK.SCR_INCD    

// EXCEPTION TRACK
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
    at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
    at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85)
    at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56)
    at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44)
    at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
    at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
    at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124)
    at 
com.xctech.cone.etl.migrate.batch.runner.RunSingleTask.main(RunSingleTask.java:111)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversions are FlinkLogicalTableSourceScan[convention: LOGICAL -> 

[jira] [Commented] (FLINK-26327) throw not a literal exception in callContext.getArgumentValue when getTypeInference

2022-02-24 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497243#comment-17497243
 ] 

Spongebob commented on FLINK-26327:
---

{code:java}
//代码占位符
Exception in thread "main" java.lang.AssertionError: not a literal: NVL($0, 2)
    at org.apache.calcite.rex.RexLiteral.findValue(RexLiteral.java:1161)
    at org.apache.calcite.rex.RexLiteral.value(RexLiteral.java:1133)
    at 
org.apache.calcite.rex.RexCallBinding.getOperandLiteralValue(RexCallBinding.java:89)
    at 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext$2.getValueAs(OperatorBindingCallContext.java:102)
    at 
org.apache.flink.table.planner.functions.inference.AbstractSqlCallContext.getLiteralValueAs(AbstractSqlCallContext.java:123)
    at 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext.getArgumentValue(OperatorBindingCallContext.java:98)
    at 
org.apache.flink.table.types.inference.utils.AdaptedCallContext.getArgumentValue(AdaptedCallContext.java:91)
    at 
com.xctech.cone.etl.migrate.udf.RoundX.lambda$getTypeInference$0(RoundX.java:47)
    at 
org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType(TypeInferenceUtil.java:152)
    at 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:100)
    at 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
    at 
org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
    at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
    at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
    at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
    at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157)
    at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:137)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:137)
    at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:162)
    at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:48)
    at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
    at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
    at 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    at 
org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
    

[jira] [Created] (FLINK-26327) throw not a literal exception in callContext.getArgumentValue when getTypeInference

2022-02-23 Thread Spongebob (Jira)
Spongebob created FLINK-26327:
-

 Summary: throw not a literal exception in 
callContext.getArgumentValue when getTypeInference
 Key: FLINK-26327
 URL: https://issues.apache.org/jira/browse/FLINK-26327
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.2
Reporter: Spongebob


 
{code:java}
//代码占位符
tableEnvironment.createTemporaryFunction("ROUNDX", RoundX.class);
tableEnvironment.createTemporaryFunction("NVL", Nvl.class);
tableEnvironment.executeSql("select ROUNDX( CAST(1.12345 as 
decimal(10,3)),NVL(MAX(f0),2) ) from t1").print();

// exception
Exception in thread "main" java.lang.AssertionError: not a literal: NVL($0, 2)

// trace
// `NVL` is a scalarFunction that likes oracle nvl function. And this exception 
might be thrown from this code in my  `getTypeInference` function of ROUNDX 
scalarFunction.
Optional secondValue = callContext.getArgumentValue(1, 
Integer.class);{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25502) eval method of Flink ScalerFunction only run one time

2022-01-02 Thread Spongebob (Jira)
Spongebob created FLINK-25502:
-

 Summary: eval method of Flink ScalerFunction only run one time
 Key: FLINK-25502
 URL: https://issues.apache.org/jira/browse/FLINK-25502
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.2
Reporter: Spongebob


assume that there is one scalerFunction named `id` which's eval method takes no 
arguments and return increasing int value on each calling. Now I found that 
when I call `id()` function in  FlinkSQL that has 3 rows , the eval method only 
was called one time so I got the same id value for each row. The sql likes 
'SELECT f0, id() FROM T'.

So I decided to define one argument on `eval` method.  When I execute sql 
'SELECT f0, id(1) FROM T' I got the same id value still. But when I execute sql 
'SELECT f0, id(f0) FROM T' then I could get the correct id value, because the 
eval method was called by three times now.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25373) task manager can not free memory when jobs are finished

2021-12-19 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-25373.
-
Resolution: Not A Problem

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25373) task manager can not free memory when jobs are finished

2021-12-19 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462420#comment-17462420
 ] 

Spongebob commented on FLINK-25373:
---

Hi Xintong,

Thanks for your kind explanatory, now I will close this ticket.

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25373) task manager can not free memory when jobs are finished

2021-12-19 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462403#comment-17462403
 ] 

Spongebob commented on FLINK-25373:
---

Hi Xintong,
 # I have checked the memory usage of Task Manager Process via `top` command, 
it showed that the task manager process had used high memory. But I could not 
found any big object in `jmap` return.
 # And meanwhile, the report in Flink webUI showed that task manager used high 
heap memory.
 # Then I run GC command manually, the report in Flink webUI showed that the 
usage had turn to low, but `top` command did not.

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25373) task manager can not free memory when jobs are finished

2021-12-18 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-25373:
--
Description: 
I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
expectation is that TaskManagers could not free memory when all jobs are 
finished whether normally or not.

And I found that there were many threads named like `
flink-taskexecutor-io-thread-x` and their states were waiting on conditions.

here's the detail of these threads:

 
"flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
at sun.misc.Unsafe.park(Native Method)
- waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

!image-2021-12-19-11-48-33-622.png!
 

  was:
I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
expectation is that TaskManagers could not free memory when all jobs are 
finished whether normally or not.

And I found that there were many threads named like `
flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
!image-2021-12-19-11-48-33-622.png!
 


> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25373) task manager can not free memory when jobs are finished

2021-12-18 Thread Spongebob (Jira)
Spongebob created FLINK-25373:
-

 Summary: task manager can not free memory when jobs are finished
 Key: FLINK-25373
 URL: https://issues.apache.org/jira/browse/FLINK-25373
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.0
 Environment: flink 1.14.0
Reporter: Spongebob
 Attachments: image-2021-12-19-11-48-33-622.png

I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
expectation is that TaskManagers could not free memory when all jobs are 
finished whether normally or not.

And I found that there were many threads named like `
flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
!image-2021-12-19-11-48-33-622.png!
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24837) submit flink job failed via restapi

2021-11-08 Thread Spongebob (Jira)
Spongebob created FLINK-24837:
-

 Summary: submit flink job failed via restapi
 Key: FLINK-24837
 URL: https://issues.apache.org/jira/browse/FLINK-24837
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.12.4
Reporter: Spongebob


I tried to submit flink job via flink restapi but got exception, I used `await` 
function in my job so that it would submit multiple jobs. Below is the 
exception detail.
{code:java}
//
"org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)\n\tatjava.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tatjava.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tatjava.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)\n\tatjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tatjava.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tatjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tatjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tatjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tatjava.lang.Thread.run(Thread.java:748)\nCaused
 by: 
java.util.concurrent.CompletionException:org.apache.flink.util.FlinkRuntimeException:
 Could not execute 
application.\n\tatjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tatjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)\n\t...
 7 more\nCaused by:org.apache.flink.util.FlinkRuntimeException: Could not 
execute 
application.\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)\n\tatorg.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)\n\tatjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t...
 7 more\nCaused by:org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error:org.apache.flink.table.api.TableException: 
Failed to wait job 
finish\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)\n\tatorg.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)\n\tatorg.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n\tatorg.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\t...10
 more\nCaused by: java.util.concurrent.ExecutionException: 
org.apache.flink.table.api.TableException: Failed to waitjob finish\n\tat 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tatjava.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)\n\tatcom.xctech.cone.flink.migrate.batch.BatchCone.main(BatchCone.java:238)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke0(Native
 
Method)\n\tatsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tatsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tatjava.lang.reflect.Method.invoke(Method.java:498)\n\tatorg.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)\n\t...
 13 more\nCaused by:org.apache.flink.table.api.TableException: Failed to wait 
job 
finish\n\tatorg.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)\n\tatorg.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)\n\tatorg.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)\n\tatorg.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)\n\tatjava.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)\n\t...
 3 more\nCaused by:org.apache.flink.util.FlinkRuntimeException: The Job Result 
cannot be fetched through the Job Client when in 

[jira] [Commented] (FLINK-24473) getString value from any datatype columns

2021-10-07 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17425607#comment-17425607
 ] 

Spongebob commented on FLINK-24473:
---

Hi Jark, I am dealing with the RowData objects in RichSinkFunction and need to 
extract values from them, but I can not extract the values correctly as they 
were not string type. Maybe you can view the codes below
{code:java}
@Override
public void invoke(RowData value, Context context) {
String v = value.getString(0).toString;
}{code}
 

> getString value from any datatype columns
> -
>
> Key: FLINK-24473
> URL: https://issues.apache.org/jira/browse/FLINK-24473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.4
>Reporter: Spongebob
>Priority: Minor
> Attachments: image-2021-10-07-21-17-20-192.png
>
>
> Hope flink would support getting string value from any other datatype column, 
> such as from int、decimal column. At current flink would throw cast exception 
> when we do that.
>  
> !image-2021-10-07-21-17-20-192.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24473) getString value from any datatype columns

2021-10-07 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-24473:
--
Attachment: image-2021-10-07-21-17-20-192.png

> getString value from any datatype columns
> -
>
> Key: FLINK-24473
> URL: https://issues.apache.org/jira/browse/FLINK-24473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.4
>Reporter: Spongebob
>Priority: Minor
> Attachments: image-2021-10-07-21-17-20-192.png
>
>
> Hope flink would support getting string value from any other datatype column, 
> such as from int、decimal column. At current flink would throw cast exception 
> when we do that.
>  
> !image-2021-10-07-21-15-05-001.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24473) getString value from any datatype columns

2021-10-07 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-24473:
--
Description: 
Hope flink would support getting string value from any other datatype column, 
such as from int、decimal column. At current flink would throw cast exception 
when we do that.

 

!image-2021-10-07-21-17-20-192.png!

  was:
Hope flink would support getting string value from any other datatype column, 
such as from int、decimal column. At current flink would throw cast exception 
when we do that.

 

!image-2021-10-07-21-15-05-001.png!


> getString value from any datatype columns
> -
>
> Key: FLINK-24473
> URL: https://issues.apache.org/jira/browse/FLINK-24473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.4
>Reporter: Spongebob
>Priority: Minor
> Attachments: image-2021-10-07-21-17-20-192.png
>
>
> Hope flink would support getting string value from any other datatype column, 
> such as from int、decimal column. At current flink would throw cast exception 
> when we do that.
>  
> !image-2021-10-07-21-17-20-192.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24473) getString value from any datatype columns

2021-10-07 Thread Spongebob (Jira)
Spongebob created FLINK-24473:
-

 Summary: getString value from any datatype columns
 Key: FLINK-24473
 URL: https://issues.apache.org/jira/browse/FLINK-24473
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.4
Reporter: Spongebob


Hope flink would support getting string value from any other datatype column, 
such as from int、decimal column. At current flink would throw cast exception 
when we do that.

 

!image-2021-10-07-21-15-05-001.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24338) Provide an interface to validate flinksql

2021-09-22 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418603#comment-17418603
 ] 

Spongebob commented on FLINK-24338:
---

Hi Patrick, I'd like to ask that how can I validate FlinkSQL with calcite? If 
you could give me some sample that would be great.

> Provide an interface to validate flinksql
> -
>
> Key: FLINK-24338
> URL: https://issues.apache.org/jira/browse/FLINK-24338
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Spongebob
>Priority: Minor
>
> It would be great if there is an interface that can validate flinksql. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24338) Provide an interface to validate flinksql

2021-09-20 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417599#comment-17417599
 ] 

Spongebob commented on FLINK-24338:
---

Our system receives FLINK SQL from website input and then need validate the 
submitted SQL, if the SQL is correct then we would storage it into database. 
Thus we are questing some methods that can validate FLINK SQL except metadata 
exceptions and it is supposed to be independent with FLINK context. 

> Provide an interface to validate flinksql
> -
>
> Key: FLINK-24338
> URL: https://issues.apache.org/jira/browse/FLINK-24338
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Spongebob
>Priority: Minor
>
> It would be great if there is an interface that can validate flinksql. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24338) Provide an interface to validate flinksql

2021-09-19 Thread Spongebob (Jira)
Spongebob created FLINK-24338:
-

 Summary: Provide an interface to validate flinksql
 Key: FLINK-24338
 URL: https://issues.apache.org/jira/browse/FLINK-24338
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: shaded-14.0
Reporter: Spongebob


It would be great if there is an interface that can validate flinksql. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24289) Load data repeatedly between different jobs

2021-09-15 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415395#comment-17415395
 ] 

Spongebob commented on FLINK-24289:
---

So are there any ways that could make the source table to be loaded once only 
between two jobs?

> Load data repeatedly between different jobs
> ---
>
> Key: FLINK-24289
> URL: https://issues.apache.org/jira/browse/FLINK-24289
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.12.4
>Reporter: Spongebob
>Priority: Major
>
> I had created an jdbc source table named tbl_a, and then I executed two DMLs 
> that both visited tbl_a in await mode, it means flink executed DML1 and then 
> DML2. Out of my expetation that flink had load tbl_a from underlying database 
> both in DML1 and DML2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24289) Load data repeatedly between different jobs

2021-09-15 Thread Spongebob (Jira)
Spongebob created FLINK-24289:
-

 Summary: Load data repeatedly between different jobs
 Key: FLINK-24289
 URL: https://issues.apache.org/jira/browse/FLINK-24289
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.4
Reporter: Spongebob


I had created an jdbc source table named tbl_a, and then I executed two DMLs 
that both visited tbl_a in await mode, it means flink executed DML1 and then 
DML2. Out of my expetation that flink had load tbl_a from underlying database 
both in DML1 and DML2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24104) incorrect validation of sink schema

2021-09-01 Thread Spongebob (Jira)
Spongebob created FLINK-24104:
-

 Summary: incorrect validation of sink schema
 Key: FLINK-24104
 URL: https://issues.apache.org/jira/browse/FLINK-24104
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.4
 Environment: Flink: 1.12.4
Reporter: Spongebob


incorrect validation of sink schema, for example, I create jdbc table `user` as 
{code:java}
//代码占位符
create table user(
id int,
name varchar(32),
desc varchar(16)
){code}
And then I connect to this jdbc table in flink, when I execute my sink sql 
which like
{code:java}
//代码占位符
insert into flink_catalog_user(desc, name, id) select ...  // please mind that 
the order of sink columns is not consist with ddl statement.{code}
, Flink application throws validation exception that is about sink schema 
incorrect. So it seams

flink would ignore my sink column's order and it would get the sink table 
schema instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23177) watermarks generated in dateStream can not flow into table

2021-06-30 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371891#comment-17371891
 ] 

Spongebob commented on FLINK-23177:
---

Thanks for your reply. It works normally now that is basic on fromDataStream. 
And I found if I set fields in `#createTemporaryView` it would works normally 
also.

> watermarks generated in dateStream can not flow into table
> --
>
> Key: FLINK-23177
> URL: https://issues.apache.org/jira/browse/FLINK-23177
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.13.1
> Environment: flink: 1.13.1
>Reporter: Spongebob
>Priority: Major
>
> I have assigned watermark in dataStream and then use the 
> `createTemporaryView` method to build a table that is source from the 
> dataStream. Out of my expectation, the watermarks works normally in 
> dataStream but the watermarks of the table stay at -9223372036854775808 
> forever.
> {code:java}
> def main(args: Array[String]): Unit = {
> val streamEnv = ...
> streamEnv.enableCheckpointing(1000)
> streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
> val dataStream = ...
> val resultStream = dataStream.map(
>   value => {
> val data = value.split(",")
> (data(0), data(1).toInt)
>   }
> ).assignTimestampsAndWatermarks(WatermarkStrategy
>   .forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] 
> {
> override def extractTimestamp(element: (String, Int), 
> recordTimestamp: Long): Long = element._2 * 1000
>   }))
>   .process(new MyProcessFunc)
> //resultStream.print("raw")
> //streamEnv.execute("")
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> streamTableEnv.createTemporaryView("catalog_test1", resultStream)
> ...
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23177) watermarks generated in dateStream can not flow into table

2021-06-29 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-23177:
--
Description: 
I have assigned watermark in dataStream and then use the `createTemporaryView` 
method to build a table that is source from the dataStream. Out of my 
expectation, the watermarks works normally in dataStream but the watermarks of 
the table stay at -9223372036854775808 forever.
{code:java}
def main(args: Array[String]): Unit = {
val streamEnv = ...
streamEnv.enableCheckpointing(1000)
streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
val dataStream = ...

val resultStream = dataStream.map(
  value => {
val data = value.split(",")
(data(0), data(1).toInt)
  }
).assignTimestampsAndWatermarks(WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.ZERO)
  .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
override def extractTimestamp(element: (String, Int), recordTimestamp: 
Long): Long = element._2 * 1000
  }))
  .process(new MyProcessFunc)

//resultStream.print("raw")
//streamEnv.execute("")

val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
streamTableEnv.createTemporaryView("catalog_test1", resultStream)
...
  }
{code}

  was:
I have assigned watermark in dataStream and then use the `createTemporaryView` 
method to build a table that is source from the dataStream. Out of my 
expectation, the watermarks works normally in dataStream but the watermarks of 
the table stay at -9223372036854775808 forever.
{code:java}
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(1000)
streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
val dataStream = streamEnv.socketTextStream("192.168.164.105", )

val resultStream = dataStream.map(
  value => {
val data = value.split(",")
(data(0), data(1).toInt)
  }
).assignTimestampsAndWatermarks(WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.ZERO)
  .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
override def extractTimestamp(element: (String, Int), recordTimestamp: 
Long): Long = element._2 * 1000
  }))
  .process(new MyProcessFunc)

//resultStream.print("raw")
//streamEnv.execute("")

val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
streamTableEnv.createTemporaryView("catalog_test1", resultStream)
val catalog = buildHiveCatalog
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test1 select _1,_2 from 
default_catalog.default_database.catalog_test1").print()
  }
{code}


> watermarks generated in dateStream can not flow into table
> --
>
> Key: FLINK-23177
> URL: https://issues.apache.org/jira/browse/FLINK-23177
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.13.1
> Environment: flink: 1.13.1
>Reporter: Spongebob
>Priority: Major
>
> I have assigned watermark in dataStream and then use the 
> `createTemporaryView` method to build a table that is source from the 
> dataStream. Out of my expectation, the watermarks works normally in 
> dataStream but the watermarks of the table stay at -9223372036854775808 
> forever.
> {code:java}
> def main(args: Array[String]): Unit = {
> val streamEnv = ...
> streamEnv.enableCheckpointing(1000)
> streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
> val dataStream = ...
> val resultStream = dataStream.map(
>   value => {
> val data = value.split(",")
> (data(0), data(1).toInt)
>   }
> ).assignTimestampsAndWatermarks(WatermarkStrategy
>   .forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] 
> {
> override def extractTimestamp(element: (String, Int), 
> recordTimestamp: Long): Long = element._2 * 1000
>   }))
>   .process(new MyProcessFunc)
> //resultStream.print("raw")
> //streamEnv.execute("")
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> streamTableEnv.createTemporaryView("catalog_test1", resultStream)
> ...
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23177) watermarks generated in dateStream can not flow into table

2021-06-29 Thread Spongebob (Jira)
Spongebob created FLINK-23177:
-

 Summary: watermarks generated in dateStream can not flow into table
 Key: FLINK-23177
 URL: https://issues.apache.org/jira/browse/FLINK-23177
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.13.1
 Environment: flink: 1.13.1
Reporter: Spongebob


I have assigned watermark in dataStream and then use the `createTemporaryView` 
method to build a table that is source from the dataStream. Out of my 
expectation, the watermarks works normally in dataStream but the watermarks of 
the table stay at -9223372036854775808 forever.
{code:java}
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(1000)
streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
val dataStream = streamEnv.socketTextStream("192.168.164.105", )

val resultStream = dataStream.map(
  value => {
val data = value.split(",")
(data(0), data(1).toInt)
  }
).assignTimestampsAndWatermarks(WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.ZERO)
  .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
override def extractTimestamp(element: (String, Int), recordTimestamp: 
Long): Long = element._2 * 1000
  }))
  .process(new MyProcessFunc)

//resultStream.print("raw")
//streamEnv.execute("")

val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
streamTableEnv.createTemporaryView("catalog_test1", resultStream)
val catalog = buildHiveCatalog
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test1 select _1,_2 from 
default_catalog.default_database.catalog_test1").print()
  }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

2021-06-07 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-22874.
-
Release Note: my misunderstanding of streaming partition sink cause this 
issue, it worked normally after I had enabled checkpointing.
  Resolution: Not A Problem

> flink table partition trigger doesn't effect as expectation when sink into 
> hive table
> -
>
> Key: FLINK-22874
> URL: https://issues.apache.org/jira/browse/FLINK-22874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: Spongebob
>Priority: Major
>
> I am trying to sink into hive partitioned table which partition commit 
> trigger is declared as "
> partition-time", and I had assigned watermark on the dataStream. When I input 
> some data into dataStream it can not commit hive partition on time. Here's my 
> code
> {code:java}
> //ddl of hive table 
> create table test_table(username string)
> partitioned by (ts bigint)
> stored as orc
> TBLPROPERTIES (
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.policy.kind'='metastore,success-file'
> );{code}
> {code:java}
> // flink application code
> val streamEnv = ...
> val dataStream:DataStream[(String, Long)] = ...
> // assign watermark and output watermark info in processFunction
> class MyProcessFunction extends ProcessFunction[(String, Long), (String, 
> Long, Long)] {
>   override def processElement(value: (String, Long), ctx: 
> ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
> Collector[(String, Long, Long)]): Unit = {
> out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
>   }
> }
> val resultStream = dataStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
> override def extractTimestamp(element: (String, Long), recordTimestamp: 
> Long): Long = {
>   element._2 * 1000
> }
>   }))
> .process(new MyProcessFunction)
> //
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> // convert dataStream into hive catalog table and sink into hive
> streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
> val catalog = ...
> streamTableEnv.registerCatalog("hive", catalog)
> streamTableEnv.useCatalog("hive")
> streamTableEnv.executeSql("insert into test_table select _1,_2 from 
> default_catalog.default_database.test_catalog_t").print()
> // flink use the default parallelism 4
> // input data
> (a, 1)
> (b, 2)
> (c, 3)
> (d, 4)
> (a, 5)
>  ...
> // result
> there are much partition directories on hdfs but all they are inprogressing 
> files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

2021-06-07 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358996#comment-17358996
 ] 

Spongebob commented on FLINK-22874:
---

Yes my misunderstanding of streaming partition sink cause this issue, it worked 
normally after I had enabled checkpointing.

> flink table partition trigger doesn't effect as expectation when sink into 
> hive table
> -
>
> Key: FLINK-22874
> URL: https://issues.apache.org/jira/browse/FLINK-22874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: Spongebob
>Priority: Major
>
> I am trying to sink into hive partitioned table which partition commit 
> trigger is declared as "
> partition-time", and I had assigned watermark on the dataStream. When I input 
> some data into dataStream it can not commit hive partition on time. Here's my 
> code
> {code:java}
> //ddl of hive table 
> create table test_table(username string)
> partitioned by (ts bigint)
> stored as orc
> TBLPROPERTIES (
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.policy.kind'='metastore,success-file'
> );{code}
> {code:java}
> // flink application code
> val streamEnv = ...
> val dataStream:DataStream[(String, Long)] = ...
> // assign watermark and output watermark info in processFunction
> class MyProcessFunction extends ProcessFunction[(String, Long), (String, 
> Long, Long)] {
>   override def processElement(value: (String, Long), ctx: 
> ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
> Collector[(String, Long, Long)]): Unit = {
> out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
>   }
> }
> val resultStream = dataStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
> override def extractTimestamp(element: (String, Long), recordTimestamp: 
> Long): Long = {
>   element._2 * 1000
> }
>   }))
> .process(new MyProcessFunction)
> //
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> // convert dataStream into hive catalog table and sink into hive
> streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
> val catalog = ...
> streamTableEnv.registerCatalog("hive", catalog)
> streamTableEnv.useCatalog("hive")
> streamTableEnv.executeSql("insert into test_table select _1,_2 from 
> default_catalog.default_database.test_catalog_t").print()
> // flink use the default parallelism 4
> // input data
> (a, 1)
> (b, 2)
> (c, 3)
> (d, 4)
> (a, 5)
>  ...
> // result
> there are much partition directories on hdfs but all they are inprogressing 
> files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

2021-06-04 Thread Spongebob (Jira)
Spongebob created FLINK-22874:
-

 Summary: flink table partition trigger doesn't effect as 
expectation when sink into hive table
 Key: FLINK-22874
 URL: https://issues.apache.org/jira/browse/FLINK-22874
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: Spongebob


I am trying to sink into hive partitioned table which partition commit trigger 
is declared as "

partition-time", and I had assigned watermark on the dataStream. When I input 
some data into dataStream it can not commit hive partition on time. Here's my 
code
{code:java}
//ddl of hive table 
create table test_table(username string)
partitioned by (ts bigint)
stored as orc
TBLPROPERTIES (
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);{code}
{code:java}
// flink application code

val streamEnv = ...
val dataStream:DataStream[(String, Long)] = ...


// assign watermark and output watermark info in processFunction
class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, 
Long)] {
  override def processElement(value: (String, Long), ctx: 
ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
Collector[(String, Long, Long)]): Unit = {
out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
  }
}

val resultStream = dataStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
  .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(element: (String, Long), recordTimestamp: 
Long): Long = {
  element._2 * 1000
}
  }))
.process(new MyProcessFunction)

//
val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())

// convert dataStream into hive catalog table and sink into hive
streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
val catalog = ...
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test_table select _1,_2 from 
default_catalog.default_database.test_catalog_t").print()


// flink use the default parallelism 4
// input data
(a, 1)
(b, 2)
(c, 3)
(d, 4)
(a, 5)
 ...

// result
there are much partition directories on hdfs but all they are inprogressing 
files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka

2021-05-19 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347469#comment-17347469
 ] 

Spongebob commented on FLINK-22190:
---

my mistake lead to this issus. But I found that I must set group.id when source 
from kafka and enable checkpoints other else I would got an 
org.apache.kafka.common.errors.InvalidGroupIdException that would end the sink 
tasks.

> no guarantee on Flink exactly_once sink to Kafka 
> -
>
> Key: FLINK-22190
> URL: https://issues.apache.org/jira/browse/FLINK-22190
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.2
> Environment: *flink: 1.12.2*
> *kafka: 2.7.0*
>Reporter: Spongebob
>Priority: Major
>
> When I tried to test the function of flink exactly_once sink to kafka, I 
> found it can not run as expectation.  here's the pipline of the flink 
> applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka 
> topic2 -> flink app2, flink tasks may met / byZeroException in random. Below 
> shows the codes:
> {code:java}
> //代码占位符
> raw data, flink app0:
> class SimpleSource1 extends SourceFunction[String] {
>  var switch = true
>  val students: Array[String] = Array("Tom", "Jerry", "Gory")
>  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit 
> = {
>  var i = 0
>  while (switch) {
>  sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
>  i += 1
>  Thread.sleep(5000)
>  }
>  }
>  override def cancel(): Unit = switch = false
> }
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val dataStream = streamEnv.addSource(new SimpleSource1)
> dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", 
> "single-partition-topic-2", new SimpleStringSchema()))
> streamEnv.execute("sink kafka")
>  
> flink-app1:
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
> val prop = new Properties()
> prop.setProperty("bootstrap.servers", "xfy:9092")
> prop.setProperty("group.id", "test")
> val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
>  "single-partition-topic-2",
>  new SimpleStringSchema,
>  prop
> ))
> val resultStream = dataStream.map(x => {
>  val data = x.split(",")
>  (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
> }
> )
> resultStream.print().setParallelism(1)
> val propProducer = new Properties()
> propProducer.setProperty("bootstrap.servers", "xfy:9092")
> propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
> resultStream.addSink(new FlinkKafkaProducer[String](
>  "single-partition-topic",
>  new MyKafkaSerializationSchema("single-partition-topic"),
>  propProducer,
>  Semantic.EXACTLY_ONCE))
> streamEnv.execute("sink kafka")
>  
> flink-app2:
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val prop = new Properties()
> prop.setProperty("bootstrap.servers", "xfy:9092")
> prop.setProperty("group.id", "test")
> prop.setProperty("isolation_level", "read_committed")
> val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
>  "single-partition-topic",
>  new SimpleStringSchema,
>  prop
> ))
> dataStream.print().setParallelism(1)
> streamEnv.execute("consumer kafka"){code}
>  
> flink app1 will print some duplicate numbers, and to my expectation flink 
> app2 will deduplicate them but the fact shows not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22458) Failed when operating empty catalog table

2021-04-26 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331808#comment-17331808
 ] 

Spongebob commented on FLINK-22458:
---

can you explain why this happened and how to avoid this issue?

> Failed when operating empty catalog table
> -
>
> Key: FLINK-22458
> URL: https://issues.apache.org/jira/browse/FLINK-22458
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.2
> Environment: Flink: 1.12.2
>Reporter: Spongebob
>Priority: Major
>
> The pipline might like: HiveTable -> FlinkCatalogTable(might be empty) -> 
> HiveTable
> It runs normally when the FlinkCatalogTable is not empty, But When 
> FlinkCatalogTable is empty, Jobmanager throws this exception:
> {code:java}
> java.lang.Exception: Failed to finalize execution on master at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1373)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:877)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1241)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1610)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1584)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:663)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_251] at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_251] at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_251] at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_251] at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  ~[flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flownData-1.0-jar-with-dependencies.jar:?] at 
> 

[jira] [Created] (FLINK-22458) Failed when operating empty catalog table

2021-04-25 Thread Spongebob (Jira)
Spongebob created FLINK-22458:
-

 Summary: Failed when operating empty catalog table
 Key: FLINK-22458
 URL: https://issues.apache.org/jira/browse/FLINK-22458
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.2
 Environment: Flink: 1.12.2
Reporter: Spongebob


The pipline might like: HiveTable -> FlinkCatalogTable(might be empty) -> 
HiveTable

It runs normally when the FlinkCatalogTable is not empty, But When 
FlinkCatalogTable is empty, Jobmanager throws this exception:
{code:java}
java.lang.Exception: Failed to finalize execution on master at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1373)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:877)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1241)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1610)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1584)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:663)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_251] at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_251] at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_251] at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_251] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flownData-1.0-jar-with-dependencies.jar:?] at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flownData-1.0-jar-with-dependencies.jar:?] Caused by: 

[jira] [Commented] (FLINK-21914) Trying to access closed classloader

2021-04-25 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331698#comment-17331698
 ] 

Spongebob commented on FLINK-21914:
---

anyone can help with this issue? When this issue happen, there will remain an 
empty staging directory on hdfs site. But it runs normally on local ide.

> Trying to access closed classloader
> ---
>
> Key: FLINK-21914
> URL: https://issues.apache.org/jira/browse/FLINK-21914
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> hadoop: 3.1.3
> hive: 3.1.2
>  
>Reporter: Spongebob
>Priority: Critical
>  Labels: stale-critical
> Attachments: app.log
>
>
> I am trying to deploy flink application on yarn, but got this exception: 
> Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
> access closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
>  
> This application tested pass on my local environment. And the application 
> detail is read and write into hive via flink table environment. you can view 
> attachment for yarn log which  source and sink data info was deleted
> Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
> access closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-
> leaked-classloader'.
> {code}
> Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
> access closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
>   at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
>   at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
>   at 
> org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
>   at 
> org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
>   at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
>   at 
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
>   at 
> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
>   at 
> org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
>   at 
> org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
>   at 
> org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
>   at 
> org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
>   at 
> org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
>   at 
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21914) Trying to access closed classloader

2021-04-25 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-21914:
--
Labels:   (was: stale-critical)

> Trying to access closed classloader
> ---
>
> Key: FLINK-21914
> URL: https://issues.apache.org/jira/browse/FLINK-21914
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> hadoop: 3.1.3
> hive: 3.1.2
>  
>Reporter: Spongebob
>Priority: Critical
> Attachments: app.log
>
>
> I am trying to deploy flink application on yarn, but got this exception: 
> Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
> access closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
>  
> This application tested pass on my local environment. And the application 
> detail is read and write into hive via flink table environment. you can view 
> attachment for yarn log which  source and sink data info was deleted
> Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
> access closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-
> leaked-classloader'.
> {code}
> Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
> access closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
>   at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
>   at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
>   at 
> org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
>   at 
> org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
>   at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
>   at 
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
>   at 
> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
>   at 
> org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
>   at 
> org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
>   at 
> org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
>   at 
> org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
>   at 
> org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
>   at 
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22422) -ynm option doesn't effect when deploying flink on yarn

2021-04-25 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331429#comment-17331429
 ] 

Spongebob commented on FLINK-22422:
---

Actually it works normally when using "-m yarn-cluster" only.

What's the difference between "-m yarn-cluster" and "-t yarn-per-job" ? I can 
only see the option "-t yarn-per-job" on flink document site.

> -ynm option doesn't effect when deploying flink on yarn
> ---
>
> Key: FLINK-22422
> URL: https://issues.apache.org/jira/browse/FLINK-22422
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> hadoop: 3.1.3
>Reporter: Spongebob
>Priority: Major
>
> -ynm option doesn't effect when deploying flink on yarn, it still appears to 
> be "Flink per-job cluster" on yarn webui.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22422) -ynm option doesn't effect when deploying flink on yarn

2021-04-24 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331382#comment-17331382
 ] 

Spongebob commented on FLINK-22422:
---

Hi Yang, here's the command: flink run -m yarn-cluster -ys 1 -yjm 2G -ytm 2G 
-yqu default -d -t yarn-per-job -d -ynm dim_gnd_category_activity -c 
dim.dim_gnd_category_activity 
gop-emp-roster_report/empData-3.0-jar-with-dependencies.jar

> -ynm option doesn't effect when deploying flink on yarn
> ---
>
> Key: FLINK-22422
> URL: https://issues.apache.org/jira/browse/FLINK-22422
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> hadoop: 3.1.3
>Reporter: Spongebob
>Priority: Major
>
> -ynm option doesn't effect when deploying flink on yarn, it still appears to 
> be "Flink per-job cluster" on yarn webui.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22422) -ynm option doesn't effect when deploying flink on yarn

2021-04-23 Thread Spongebob (Jira)
Spongebob created FLINK-22422:
-

 Summary: -ynm option doesn't effect when deploying flink on yarn
 Key: FLINK-22422
 URL: https://issues.apache.org/jira/browse/FLINK-22422
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.12.2
 Environment: flink: 1.12.2

hadoop: 3.1.3
Reporter: Spongebob


-ynm option doesn't effect when deploying flink on yarn, it still appears to be 
"Flink per-job cluster" on yarn webui.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22247) can not pass AddressList when connecting to rabbitmq

2021-04-13 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-22247:
--
Environment: 
flink: 1.12.2

rabbitmq: 3.8.4

  was:
flink: 2.12.2

rabbitmq: 3.8.4


> can not pass AddressList when connecting to rabbitmq
> 
>
> Key: FLINK-22247
> URL: https://issues.apache.org/jira/browse/FLINK-22247
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> rabbitmq: 3.8.4
>Reporter: Spongebob
>Priority: Major
>
> We hope to connect to rabbitmq cluster address when using rabbitmq connector, 
> So we override the setupConnection function to pass the rabbitmq cluster 
> address, but the address class is not serializable thereby flink throws 
> exception.
> {code:java}
> //代码占位符
> val rabbitmqAddresses = Array(
>   new Address("xxx1", 5672),
>   new Address("xxx2", 5672),
>   new Address("xxx3", 5672))
> val dataStream = streamEnv
>   .addSource(new RMQSource[String](
> rabbitmqConfig, // rabbitmq's connection config
> "queueName", // queue name
> true, // using correlation ids, assurance of exactly-once consume from 
> rabbitmq
> new SimpleStringSchema // java deserialization
>   ) {
> override def setupQueue(): Unit = {}
> override def setupConnection(): Connection = {
>   rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
> }
>   }).setParallelism(1)
> {code}
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: 
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
> probably contains or references non serializable fields.Exception in thread 
> "main" org.apache.flink.api.common.InvalidProgramException: 
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
> probably contains or references non serializable fields. at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652)
>  at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693)
>  at testConsumer$.main(testConsumer.scala:30) at 
> testConsumer.main(testConsumer.scala)Caused by: 
> java.io.NotSerializableException: com.rabbitmq.client.Address at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at 
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) 
> ... 9 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22247) can not pass AddressList when connecting to rabbitmq

2021-04-12 Thread Spongebob (Jira)
Spongebob created FLINK-22247:
-

 Summary: can not pass AddressList when connecting to rabbitmq
 Key: FLINK-22247
 URL: https://issues.apache.org/jira/browse/FLINK-22247
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.12.2
 Environment: flink: 2.12.2

rabbitmq: 3.8.4
Reporter: Spongebob


We hope to connect to rabbitmq cluster address when using rabbitmq connector, 
So we override the setupConnection function to pass the rabbitmq cluster 
address, but the address class is not serializable thereby flink throws 
exception.
{code:java}
//代码占位符
val rabbitmqAddresses = Array(
  new Address("xxx1", 5672),
  new Address("xxx2", 5672),
  new Address("xxx3", 5672))

val dataStream = streamEnv
  .addSource(new RMQSource[String](
rabbitmqConfig, // rabbitmq's connection config
"queueName", // queue name
true, // using correlation ids, assurance of exactly-once consume from 
rabbitmq
new SimpleStringSchema // java deserialization
  ) {
override def setupQueue(): Unit = {}

override def setupConnection(): Connection = {
  rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
}
  }).setParallelism(1)
{code}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
[Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
probably contains or references non serializable fields.Exception in thread 
"main" org.apache.flink.api.common.InvalidProgramException: 
[Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
probably contains or references non serializable fields. at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652)
 at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693)
 at testConsumer$.main(testConsumer.scala:30) at 
testConsumer.main(testConsumer.scala)Caused by: 
java.io.NotSerializableException: com.rabbitmq.client.Address at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at 
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
 at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 
9 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka

2021-04-10 Thread Spongebob (Jira)
Spongebob created FLINK-22190:
-

 Summary: no guarantee on Flink exactly_once sink to Kafka 
 Key: FLINK-22190
 URL: https://issues.apache.org/jira/browse/FLINK-22190
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.12.2
 Environment: *flink: 1.12.2*

*kafka: 2.7.0*
Reporter: Spongebob


When I tried to test the function of flink exactly_once sink to kafka, I found 
it can not run as expectation.  here's the pipline of the flink applications: 
raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka topic2 -> flink 
app2, flink tasks may met / byZeroException in random. Below shows the codes:
{code:java}
//代码占位符
raw data, flink app0:
class SimpleSource1 extends SourceFunction[String] {
 var switch = true
 val students: Array[String] = Array("Tom", "Jerry", "Gory")

 override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
 var i = 0
 while (switch) {
 sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
 i += 1
 Thread.sleep(5000)
 }
 }
 override def cancel(): Unit = switch = false
}
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = streamEnv.addSource(new SimpleSource1)

dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", 
"single-partition-topic-2", new SimpleStringSchema()))

streamEnv.execute("sink kafka")
 
flink-app1:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
val prop = new Properties()
prop.setProperty("bootstrap.servers", "xfy:9092")
prop.setProperty("group.id", "test")
val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
 "single-partition-topic-2",
 new SimpleStringSchema,
 prop
))
val resultStream = dataStream.map(x => {
 val data = x.split(",")
 (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
}
)
resultStream.print().setParallelism(1)
val propProducer = new Properties()
propProducer.setProperty("bootstrap.servers", "xfy:9092")
propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
resultStream.addSink(new FlinkKafkaProducer[String](
 "single-partition-topic",
 new MyKafkaSerializationSchema("single-partition-topic"),
 propProducer,
 Semantic.EXACTLY_ONCE))
streamEnv.execute("sink kafka")
 
flink-app2:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers", "xfy:9092")
prop.setProperty("group.id", "test")
prop.setProperty("isolation_level", "read_committed")
val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
 "single-partition-topic",
 new SimpleStringSchema,
 prop
))
dataStream.print().setParallelism(1)
streamEnv.execute("consumer kafka"){code}
 

flink app1 will print some duplicate numbers, and to my expectation flink app2 
will deduplicate them but the fact shows not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv

2021-04-08 Thread Spongebob (Jira)
Spongebob created FLINK-22165:
-

 Summary: How to set rabbitmq correlationId when using rabbitmq 
sink in dataStreamEnv
 Key: FLINK-22165
 URL: https://issues.apache.org/jira/browse/FLINK-22165
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Affects Versions: 1.12.2
 Environment: Flink 1.12.2

rabbitmq 3.8.4
Reporter: Spongebob


Flink rabbitmq module provides source and sink function for rabbitmq. We can 
use the correlationId to deduplicate the checkpoints record, So can we set a 
correlationId for each message to sink into rabbitmq ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >