[jira] [Commented] (FLINK-29427) LookupJoinITCase failed with classloader problem

2022-11-21 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637065#comment-17637065
 ] 

Matthias Pohl commented on FLINK-29427:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43221=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=22265

> LookupJoinITCase failed with classloader problem
> 
>
> Key: FLINK-29427
> URL: https://issues.apache.org/jira/browse/FLINK-29427
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Alexander Smirnov
>Priority: Blocker
>  Labels: test-stability
>
> {code:java}
> 2022-09-27T02:49:20.9501313Z Sep 27 02:49:20 Caused by: 
> org.codehaus.janino.InternalCompilerException: Compiling 
> "KeyProjection$108341": Trying to access closed classloader. Please check if 
> you store classloaders directly or indirectly in static fields. If the 
> stacktrace suggests that the leak occurs in a third party library and cannot 
> be fixed immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> 2022-09-27T02:49:20.9502654Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382)
> 2022-09-27T02:49:20.9503366Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> 2022-09-27T02:49:20.9504044Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> 2022-09-27T02:49:20.9504704Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> 2022-09-27T02:49:20.9505341Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> 2022-09-27T02:49:20.9505965Z Sep 27 02:49:20  at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> 2022-09-27T02:49:20.9506584Z Sep 27 02:49:20  at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> 2022-09-27T02:49:20.9507261Z Sep 27 02:49:20  at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
> 2022-09-27T02:49:20.9507883Z Sep 27 02:49:20  ... 30 more
> 2022-09-27T02:49:20.9509266Z Sep 27 02:49:20 Caused by: 
> java.lang.IllegalStateException: Trying to access closed classloader. Please 
> check if you store classloaders directly or indirectly in static fields. If 
> the stacktrace suggests that the leak occurs in a third party library and 
> cannot be fixed immediately, you can disable this check with the 
> configuration 'classloader.check-leaked-classloader'.
> 2022-09-27T02:49:20.9510835Z Sep 27 02:49:20  at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)
> 2022-09-27T02:49:20.9511760Z Sep 27 02:49:20  at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
> 2022-09-27T02:49:20.9512456Z Sep 27 02:49:20  at 
> java.lang.Class.forName0(Native Method)
> 2022-09-27T02:49:20.9513014Z Sep 27 02:49:20  at 
> java.lang.Class.forName(Class.java:348)
> 2022-09-27T02:49:20.9513649Z Sep 27 02:49:20  at 
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:89)
> 2022-09-27T02:49:20.9514339Z Sep 27 02:49:20  at 
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:312)
> 2022-09-27T02:49:20.9514990Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:8556)
> 2022-09-27T02:49:20.9515659Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6749)
> 2022-09-27T02:49:20.9516337Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)
> 2022-09-27T02:49:20.9516989Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573)
> 2022-09-27T02:49:20.9517632Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215)
> 2022-09-27T02:49:20.9518319Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481)
> 2022-09-27T02:49:20.9519018Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476)
> 2022-09-27T02:49:20.9519680Z Sep 27 02:49:20  at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928)
> 2022-09-27T02:49:20.9520386Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476)
> 2022-09-27T02:49:20.9521042Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469)
> 2022-09-27T02:49:20.9521677Z Sep 27 02:49:20  at 
> 

[jira] [Created] (FLINK-30136) ElasticsearchSinkITCase.testElasticsearchSink failed on azure due to ES service unavailable

2022-11-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-30136:
--

 Summary: ElasticsearchSinkITCase.testElasticsearchSink failed on 
azure due to ES service unavailable
 Key: FLINK-30136
 URL: https://issues.apache.org/jira/browse/FLINK-30136
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.16.0
Reporter: Leonard Xu



{noformat}

Nov 22 02:24:55 at 
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
Nov 22 02:24:55 at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
Nov 22 02:24:55 at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.responseReceived(HttpAsyncRequestExecutor.java:309)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:255)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
Nov 22 02:24:55 at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
Nov 22 02:24:55 ... 1 more
{noformat}


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43369=logs=dbe51908-4958-5c8c-9557-e10952d4259d=55d11a16-067d-538d-76a3-4c096a3a8e24



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30137) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory failed due to NoSuchFileException

2022-11-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl closed FLINK-30137.
-
Resolution: Duplicate

> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory 
> failed due to NoSuchFileException
> 
>
> Key: FLINK-30137
> URL: https://issues.apache.org/jira/browse/FLINK-30137
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
> Attachments: 
> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory.log
>
>
> {{LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory}}
>  failed in [this 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43366=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=10718]
>  due to a {{NoSuchFileException}} during cleanup.
> {code:java}
> java.io.IOException: Failed to delete temp directory 
> /tmp/junit2010448393472419340. The following paths could not be deleted (see 
> suppressed exceptions for details): 
> tm_taskManager_2/localState/aid_21c128b018cc61989c323cda6e36b0b1/jid_e5dbf7bc4ebb72baf20387e555083439/vtx_bc764cd8ddf7a0cff126f51c16239658_sti_1
>         at 
> org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.createIOExceptionWithAttachedFailures(TempDirectory.java:280)
>         at 
> org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.close(TempDirectory.java:188)
>         at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at 
> org.junit.jupiter.engine.execution.ExtensionValuesStore.lambda$closeAllStoredCloseableValues$3(ExtensionValuesStore.java:68)
>         at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>         at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>         at java.util.ArrayList.forEach(ArrayList.java:1259)
>         at java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:395)
>         at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>         at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>         at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>         at 
> org.junit.jupiter.engine.execution.ExtensionValuesStore.closeAllStoredCloseableValues(ExtensionValuesStore.java:68)
>         at 
> org.junit.jupiter.engine.descriptor.AbstractExtensionContext.close(AbstractExtensionContext.java:77)
>         at 
> org.junit.jupiter.engine.execution.JupiterEngineExecutionContext.close(JupiterEngineExecutionContext.java:53)
>         at 
> org.junit.jupiter.engine.descriptor.JupiterTestDescriptor.cleanUp(JupiterTestDescriptor.java:222)
>         at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$cleanUp$1(TestMethodTestDescriptor.java:152)
>         at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.cleanUp(TestMethodTestDescriptor.java:152)
>         at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.cleanUp(TestMethodTestDescriptor.java:66)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$cleanUp$10(NodeTestTask.java:167)
>         at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.cleanUp(NodeTestTask.java:167)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:98)
>         at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
>         at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>         at 
> 

[jira] [Updated] (FLINK-30137) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory failed due to NoSuchFileException

2022-11-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-30137:
--
Attachment: 
LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory.log

> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory 
> failed due to NoSuchFileException
> 
>
> Key: FLINK-30137
> URL: https://issues.apache.org/jira/browse/FLINK-30137
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
> Attachments: 
> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory.log
>
>
> {{LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory}}
>  failed in [this 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43366=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=10718]
>  due to a {{NoSuchFileException}} during cleanup.
> {code:java}
> java.io.IOException: Failed to delete temp directory 
> /tmp/junit2010448393472419340. The following paths could not be deleted (see 
> suppressed exceptions for details): 
> tm_taskManager_2/localState/aid_21c128b018cc61989c323cda6e36b0b1/jid_e5dbf7bc4ebb72baf20387e555083439/vtx_bc764cd8ddf7a0cff126f51c16239658_sti_1
>         at 
> org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.createIOExceptionWithAttachedFailures(TempDirectory.java:280)
>         at 
> org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.close(TempDirectory.java:188)
>         at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at 
> org.junit.jupiter.engine.execution.ExtensionValuesStore.lambda$closeAllStoredCloseableValues$3(ExtensionValuesStore.java:68)
>         at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>         at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>         at java.util.ArrayList.forEach(ArrayList.java:1259)
>         at java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:395)
>         at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>         at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>         at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>         at 
> org.junit.jupiter.engine.execution.ExtensionValuesStore.closeAllStoredCloseableValues(ExtensionValuesStore.java:68)
>         at 
> org.junit.jupiter.engine.descriptor.AbstractExtensionContext.close(AbstractExtensionContext.java:77)
>         at 
> org.junit.jupiter.engine.execution.JupiterEngineExecutionContext.close(JupiterEngineExecutionContext.java:53)
>         at 
> org.junit.jupiter.engine.descriptor.JupiterTestDescriptor.cleanUp(JupiterTestDescriptor.java:222)
>         at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$cleanUp$1(TestMethodTestDescriptor.java:152)
>         at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.cleanUp(TestMethodTestDescriptor.java:152)
>         at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.cleanUp(TestMethodTestDescriptor.java:66)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$cleanUp$10(NodeTestTask.java:167)
>         at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.cleanUp(NodeTestTask.java:167)
>         at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:98)
>         at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
>         at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129)
>         at 
> 

[jira] [Created] (FLINK-30137) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory failed due to NoSuchFileException

2022-11-21 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30137:
-

 Summary: 
LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory 
failed due to NoSuchFileException
 Key: FLINK-30137
 URL: https://issues.apache.org/jira/browse/FLINK-30137
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure, Tests
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Matthias Pohl
 Attachments: 
LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory.log

{{LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory}} 
failed in [this 
build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43366=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=10718]
 due to a {{NoSuchFileException}} during cleanup.
{code:java}
java.io.IOException: Failed to delete temp directory 
/tmp/junit2010448393472419340. The following paths could not be deleted (see 
suppressed exceptions for details): 
tm_taskManager_2/localState/aid_21c128b018cc61989c323cda6e36b0b1/jid_e5dbf7bc4ebb72baf20387e555083439/vtx_bc764cd8ddf7a0cff126f51c16239658_sti_1
        at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.createIOExceptionWithAttachedFailures(TempDirectory.java:280)
        at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.close(TempDirectory.java:188)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.jupiter.engine.execution.ExtensionValuesStore.lambda$closeAllStoredCloseableValues$3(ExtensionValuesStore.java:68)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:395)
        at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at 
org.junit.jupiter.engine.execution.ExtensionValuesStore.closeAllStoredCloseableValues(ExtensionValuesStore.java:68)
        at 
org.junit.jupiter.engine.descriptor.AbstractExtensionContext.close(AbstractExtensionContext.java:77)
        at 
org.junit.jupiter.engine.execution.JupiterEngineExecutionContext.close(JupiterEngineExecutionContext.java:53)
        at 
org.junit.jupiter.engine.descriptor.JupiterTestDescriptor.cleanUp(JupiterTestDescriptor.java:222)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$cleanUp$1(TestMethodTestDescriptor.java:152)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.cleanUp(TestMethodTestDescriptor.java:152)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.cleanUp(TestMethodTestDescriptor.java:66)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$cleanUp$10(NodeTestTask.java:167)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.cleanUp(NodeTestTask.java:167)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:98)
        at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
        at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637056#comment-17637056
 ] 

Leonard Xu commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43369=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 

[jira] [Comment Edited] (FLINK-29755) PulsarSourceUnorderedE2ECase.testSavepoint failed because of missing TaskManagers

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636424#comment-17636424
 ] 

Leonard Xu edited comment on FLINK-29755 at 11/22/22 7:48 AM:
--

[~syhily] I also found some shade error  log and pulsar internal error log, 
these error logs really make the troubleshoot harder, could you take a look ?


{noformat}
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT: 2022-11-19 03:05:19,977 ERROR 
org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.rejectedExecution
 [] - Failed to submit a listener notification task. Event loop shut down?
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT: java.lang.NoClassDefFoundError: 
org/apache/pulsar/shade/io/netty/util/concurrent/GlobalEventExecutor$2
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:223)
 
~[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.GlobalEventExecutor.execute0(GlobalEventExecutor.java:211)
 
~[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:205)
 
~[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1057)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 
[blob_p-fb94d82f266979b2959919c77d8d46821bf01b74-6789386b595a9ff48b74a062fd69a96e:2.10.2]
03:05:19,978 [docker-java-stream-848840694] INFO  
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment [] 
- [JobManager] STDOUT:at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 

[jira] [Commented] (FLINK-29755) PulsarSourceUnorderedE2ECase.testSavepoint failed because of missing TaskManagers

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637054#comment-17637054
 ] 

Leonard Xu commented on FLINK-29755:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43368=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678

> PulsarSourceUnorderedE2ECase.testSavepoint failed because of missing 
> TaskManagers
> -
>
> Key: FLINK-29755
> URL: https://issues.apache.org/jira/browse/FLINK-29755
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Attachments: PulsarSourceUnorderedE2ECase.testSavepoint.log
>
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42325=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=13932]
>  failed (not exclusively) due to a problem with 
> {{PulsarSourceUnorderedE2ECase.testSavepoint}}. It seems like there were no 
> TaskManagers spun up which resulted in the test job failing with a 
> {{NoResourceAvailableException}}.
> {code}
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - 
> Could not acquire the minimum required resources, failing slot requests. 
> Acquired: []. Current slot pool status: Registered TMs: 0, registered slots: 
> 0 free slots: 0
> {code}
> I didn't raise this one to critical because it looks like a missing 
> TaskManager test environment issue. I attached the e2e test-specific logs to 
> the Jira issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28766) UnalignedCheckpointStressITCase.runStressTest failed with NoSuchFileException

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637052#comment-17637052
 ] 

Leonard Xu commented on FLINK-28766:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=747432ad-a576-5911-1e2a-68c6bedc248a

> UnalignedCheckpointStressITCase.runStressTest failed with NoSuchFileException
> -
>
> Key: FLINK-28766
> URL: https://issues.apache.org/jira/browse/FLINK-28766
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Anton Kalashnikov
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-08-01T01:36:16.0563880Z Aug 01 01:36:16 [ERROR] 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runStressTest
>   Time elapsed: 12.579 s  <<< ERROR!
> 2022-08-01T01:36:16.0565407Z Aug 01 01:36:16 java.io.UncheckedIOException: 
> java.nio.file.NoSuchFileException: 
> /tmp/junit1058240190382532303/f0f99754a53d2c4633fed75011da58dd/chk-7/61092e4a-5b9a-4f56-83f7-d9960c53ed3e
> 2022-08-01T01:36:16.0566296Z Aug 01 01:36:16  at 
> java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
> 2022-08-01T01:36:16.0566972Z Aug 01 01:36:16  at 
> java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
> 2022-08-01T01:36:16.0567600Z Aug 01 01:36:16  at 
> java.util.Iterator.forEachRemaining(Iterator.java:115)
> 2022-08-01T01:36:16.0568290Z Aug 01 01:36:16  at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> 2022-08-01T01:36:16.0569172Z Aug 01 01:36:16  at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 2022-08-01T01:36:16.0569877Z Aug 01 01:36:16  at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 2022-08-01T01:36:16.0570554Z Aug 01 01:36:16  at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> 2022-08-01T01:36:16.0571371Z Aug 01 01:36:16  at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 2022-08-01T01:36:16.0572417Z Aug 01 01:36:16  at 
> java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:546)
> 2022-08-01T01:36:16.0573618Z Aug 01 01:36:16  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.discoverRetainedCheckpoint(UnalignedCheckpointStressITCase.java:289)
> 2022-08-01T01:36:16.0575187Z Aug 01 01:36:16  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runAndTakeExternalCheckpoint(UnalignedCheckpointStressITCase.java:262)
> 2022-08-01T01:36:16.0576540Z Aug 01 01:36:16  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runStressTest(UnalignedCheckpointStressITCase.java:158)
> 2022-08-01T01:36:16.0577684Z Aug 01 01:36:16  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-08-01T01:36:16.0578546Z Aug 01 01:36:16  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-08-01T01:36:16.0579374Z Aug 01 01:36:16  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-08-01T01:36:16.0580298Z Aug 01 01:36:16  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-08-01T01:36:16.0581243Z Aug 01 01:36:16  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-08-01T01:36:16.0582029Z Aug 01 01:36:16  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-08-01T01:36:16.0582766Z Aug 01 01:36:16  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-08-01T01:36:16.0583488Z Aug 01 01:36:16  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-08-01T01:36:16.0584203Z Aug 01 01:36:16  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-08-01T01:36:16.0585087Z Aug 01 01:36:16  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-08-01T01:36:16.0585778Z Aug 01 01:36:16  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-08-01T01:36:16.0586482Z Aug 01 01:36:16  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-08-01T01:36:16.0587155Z Aug 01 01:36:16  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-08-01T01:36:16.0587809Z Aug 01 01:36:16  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-08-01T01:36:16.0588434Z Aug 01 01:36:16  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 

[jira] [Created] (FLINK-30135) RMQSourceITCase failed on azure with exit code 143

2022-11-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-30135:
--

 Summary: RMQSourceITCase failed on azure with exit code 143
 Key: FLINK-30135
 URL: https://issues.apache.org/jira/browse/FLINK-30135
 Project: Flink
  Issue Type: Bug
  Components: Connectors/ RabbitMQ
Affects Versions: 1.16.0
Reporter: Leonard Xu



{noformat}
 Nov 22 02:38:20 JNI global refs: 29, weak refs: 0
Nov 22 02:38:20 
Nov 22 02:38:20 Killing process with pid=985 and all descendants
/__w/1/s/tools/ci/watchdog.sh: line 113:   985 Terminated  $cmd
Nov 22 02:38:21 Process exited with EXIT CODE: 143.
Nov 22 02:38:21 Trying to KILL watchdog (981).
Nov 22 02:38:21 Searching for .dump, .dumpstream and related files in '/__w/1/s'
Nov 22 02:38:26 Moving 
'/__w/1/s/flink-connectors/flink-connector-rabbitmq/target/surefire-reports/2022-11-22T02-07-07_451-jvmRun4.dumpstream'
 to target directory ('/__w/_temp/debug_files')
Nov 22 02:38:26 Moving 
'/__w/1/s/flink-connectors/flink-connector-rabbitmq/target/surefire-reports/2022-11-22T02-07-07_451-jvmRun4.dump'
 to target directory ('/__w/_temp/debug_files')
The STDIO streams did not close within 10 seconds of the exit event from 
process '/bin/bash'. This may indicate a child process inherited the STDIO 
streams and has not yet exited.
##[error]Bash exited with code '143'.
Finishing: Test - connect_2
{noformat}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=fa307d6d-91b1-5ab6-d460-ef50f552b1fe=21eae189-b04c-5c04-662b-17dc80ffc83a




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25587) HiveCatalogITCase crashed on Azure with exit code 239

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637047#comment-17637047
 ] 

Leonard Xu commented on FLINK-25587:


HiveModuleTest crashed on Azure with exit code 239
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199

> HiveCatalogITCase crashed on Azure with exit code 239
> -
>
> Key: FLINK-25587
> URL: https://issues.apache.org/jira/browse/FLINK-25587
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Connectors / Hive
>Reporter: Yun Gao
>Assignee: luoyuxia
>Priority: Major
>  Labels: test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> Jan 10 05:56:47 [ERROR] Please refer to 
> /__w/1/s/flink-connectors/flink-connector-hive/target/surefire-reports for 
> the individual test results.
> Jan 10 05:56:47 [ERROR] Please refer to dump files (if any exist) 
> [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
> Jan 10 05:56:47 [ERROR] ExecutionException The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> Jan 10 05:56:47 [ERROR] Command was /bin/sh -c cd 
> /__w/1/s/flink-connectors/flink-connector-hive/target && 
> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /__w/1/s/flink-connectors/flink-connector-hive/target/surefire/surefirebooter4268791554037437993.jar
>  /__w/1/s/flink-connectors/flink-connector-hive/target/surefire 
> 2022-01-10T05-33-55_476-jvmRun2 surefire485701932125419585tmp 
> surefire_501275439895774136096tmp
> Jan 10 05:56:47 [ERROR] Error occurred in starting fork, check output in log
> Jan 10 05:56:47 [ERROR] Process Exit Code: 239
> Jan 10 05:56:47 [ERROR] Crashed tests:
> Jan 10 05:56:47 [ERROR] org.apache.flink.table.catalog.hive.HiveCatalogITCase
> Jan 10 05:56:47 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> Jan 10 05:56:47 [ERROR] Command was /bin/sh -c cd 
> /__w/1/s/flink-connectors/flink-connector-hive/target && 
> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
> /__w/1/s/flink-connectors/flink-connector-hive/target/surefire/surefirebooter4268791554037437993.jar
>  /__w/1/s/flink-connectors/flink-connector-hive/target/surefire 
> 2022-01-10T05-33-55_476-jvmRun2 surefire485701932125419585tmp 
> surefire_501275439895774136096tmp
> Jan 10 05:56:47 [ERROR] Error occurred in starting fork, check output in log
> Jan 10 05:56:47 [ERROR] Process Exit Code: 239
> Jan 10 05:56:47 [ERROR] Crashed tests:
> Jan 10 05:56:47 [ERROR] org.apache.flink.table.catalog.hive.HiveCatalogITCase
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1314)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1159)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:932)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> Jan 10 05:56:47 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
> 

[jira] [Commented] (FLINK-30133) HadoopModuleFactory creates error if the security module cannot be loaded

2022-11-21 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637045#comment-17637045
 ] 

Matthias Pohl commented on FLINK-30133:
---

[~gsomogyi] I'm curious what you think of that. Do you agree that warning the 
user through a log message is good enough or should we fail fatally if the 
hadoop security module failed to be loaded because the user might assume 
certain security measures being in place if he/she misses this log warning.

> HadoopModuleFactory creates error if the security module cannot be loaded
> -
>
> Key: FLINK-30133
> URL: https://issues.apache.org/jira/browse/FLINK-30133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: starter
>
> [HadoopModuleFactory|https://github.com/apache/flink/blob/26aa543b3bbe2b606bbc6d332a2ef7c5b46d25eb/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModuleFactory.java#L51]
>  tries to load the {{{}HadoopModule{}}}. If it fails to load the module, it 
> will log an error an return {{null}} which is going to be handled properly. 
> The resulting error log is, therefore, confusing. We might want to lower the 
> log level to warning since the error doesn't affect the Flink cluster in a 
> fatal way.
> We might want to make the cluster fail fatally if we consider this a sever 
> usability problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18356) flink-table-planner Exit code 137 returned from process

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637044#comment-17637044
 ] 

Leonard Xu commented on FLINK-18356:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be

> flink-table-planner Exit code 137 returned from process
> ---
>
> Key: FLINK-18356
> URL: https://issues.apache.org/jira/browse/FLINK-18356
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Tests
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: 1234.jpg, app-profiling_4.gif
>
>
> {noformat}
> = test session starts 
> ==
> platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1
> cachedir: .tox/py37-cython/.pytest_cache
> rootdir: /__w/3/s/flink-python
> collected 568 items
> pyflink/common/tests/test_configuration.py ..[  
> 1%]
> pyflink/common/tests/test_execution_config.py ...[  
> 5%]
> pyflink/dataset/tests/test_execution_environment.py .
> ##[error]Exit code 137 returned from process: file name '/bin/docker', 
> arguments 'exec -i -u 1002 
> 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb 
> /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'.
> Finishing: Test - python
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30134) Annotate all the public classes for the delegation token framework

2022-11-21 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-30134:
-

 Summary: Annotate all the public classes for the delegation token 
framework
 Key: FLINK-30134
 URL: https://issues.apache.org/jira/browse/FLINK-30134
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Gabor Somogyi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30134) Annotate all the public classes for the delegation token framework

2022-11-21 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi reassigned FLINK-30134:
-

Assignee: Gabor Somogyi

> Annotate all the public classes for the delegation token framework
> --
>
> Key: FLINK-30134
> URL: https://issues.apache.org/jira/browse/FLINK-30134
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.17.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637042#comment-17637042
 ] 

Leonard Xu commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 

[jira] [Created] (FLINK-30133) HadoopModuleFactory creates error if the security module cannot be loaded

2022-11-21 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30133:
-

 Summary: HadoopModuleFactory creates error if the security module 
cannot be loaded
 Key: FLINK-30133
 URL: https://issues.apache.org/jira/browse/FLINK-30133
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Matthias Pohl


[HadoopModuleFactory|https://github.com/apache/flink/blob/26aa543b3bbe2b606bbc6d332a2ef7c5b46d25eb/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModuleFactory.java#L51]
 tries to load the {{{}HadoopModule{}}}. If it fails to load the module, it 
will log an error an return {{null}} which is going to be handled properly. The 
resulting error log is, therefore, confusing. We might want to lower the log 
level to warning since the error doesn't affect the Flink cluster in a fatal 
way.

We might want to make the cluster fail fatally if we consider this a sever 
usability problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30132) Test LocalRecoveryITCase#testRecoverLocallyFromProcessCrashWithWorkingDirectory failed on azure due to File not exists

2022-11-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-30132:
--

 Summary: Test 
LocalRecoveryITCase#testRecoverLocallyFromProcessCrashWithWorkingDirectory 
failed on azure due to File not exists
 Key: FLINK-30132
 URL: https://issues.apache.org/jira/browse/FLINK-30132
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.16.0
Reporter: Leonard Xu



{noformat}

at 
sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
at java.nio.file.Files.readAttributes(Files.java:1737)
at 
java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
at java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
at java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372)
at java.nio.file.Files.walkFileTree(Files.java:2706)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.deleteAllFilesAndDirectories(TempDirectory.java:199)
at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath.close(TempDirectory.java:186)
... 51 more
Suppressed: java.nio.file.NoSuchFileException: 
/tmp/junit2010448393472419340/tm_taskManager_2/localState/aid_21c128b018cc61989c323cda6e36b0b1/jid_e5dbf7bc4ebb72baf20387e555083439/vtx_bc764cd8ddf7a0cff126f51c16239658_sti_1
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
at 
sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath$1.resetPermissionsAndTryToDeleteAgain(TempDirectory.java:250)
at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath$1.visitFileFailed(TempDirectory.java:212)
at 
org.junit.jupiter.engine.extension.TempDirectory$CloseablePath$1.visitFileFailed(TempDirectory.java:199)
at java.nio.file.Files.walkFileTree(Files.java:2672)
... 54 more

Nov 21 19:52:57 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 15.971 s <<< FAILURE! - in 
org.apache.flink.test.recovery.LocalRecoveryITCase
Nov 21 19:52:57 [ERROR] 
org.apache.flink.test.recovery.LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory
  Time elapsed: 15.942 s  <<< ERROR!

{noformat}


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43366=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs

2022-11-21 Thread GitBox


snuyanzin commented on PR #17649:
URL: https://github.com/apache/flink/pull/17649#issuecomment-1323217046

   Rebased since latest failure is related to removed elasticsearch module and 
the PR was done before it happened


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21363: [FLINK-29155][python] Change some configs to make grpc server stable in PyFlink Process Mode

2022-11-21 Thread GitBox


flinkbot commented on PR #21363:
URL: https://github.com/apache/flink/pull/21363#issuecomment-1323216318

   
   ## CI report:
   
   * a5c4a83cb46e9d76b6cc49d2647ae97343410408 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

2022-11-21 Thread GitBox


flinkbot commented on PR #21362:
URL: https://github.com/apache/flink/pull/21362#issuecomment-1323211848

   
   ## CI report:
   
   * 3465fa39bbfa6e701adbe7c153680bf2768c8954 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29155) Improve default config of grpcServer in Process Mode

2022-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29155:
---
Labels: pull-request-available  (was: )

> Improve default config of grpcServer in Process Mode
> 
>
> Key: FLINK-29155
> URL: https://issues.apache.org/jira/browse/FLINK-29155
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.14.5, 1.16.0, 1.15.3
>Reporter: Xingbo Huang
>Assignee: Xingbo Huang
>Priority: Major
>  Labels: pull-request-available
>
> The existing grpcServer configuration may cause channel disconnection when 
> there is a large amount of data. Some PyFlink users are very troubled by this 
> problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] HuangXingBo opened a new pull request, #21363: [FLINK-29155][python] Change some configs to make grpc server stable in PyFlink Process Mode

2022-11-21 Thread GitBox


HuangXingBo opened a new pull request, #21363:
URL: https://github.com/apache/flink/pull/21363

   ## What is the purpose of the change
   
   *This pull request will change some configs to make grpc server stable in 
PyFlink Process Mode. *
   
   
   ## Brief change log
   
 - *Port Beam `ServerFactory` class to flink-python module*
 - *Change default `KEEP_ALIVE_TIME_SEC` to 19. The BDP ping period is 
locally-decided and the keep alive time is 20 seconds in client side, so we 
choose the server to allow pings every 19 seconds.*
 - *Adjust the default value of bundle size and arrow batch size. The 
default value of bundle size is 10. But in most scenarios, this default 
value is unreasonable, and it is easy to cause instability of the grpc server, 
so we adjust the default value to 1000.*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Original tests*
 - *Manual test a job which takes 1min in a udf*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29430) Sanity check in InternalKeyContextImpl#setCurrentKeyGroupIndex

2022-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29430:
---
Labels: pull-request-available  (was: )

> Sanity check in InternalKeyContextImpl#setCurrentKeyGroupIndex
> --
>
> Key: FLINK-29430
> URL: https://issues.apache.org/jira/browse/FLINK-29430
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.3
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently the HeapStateBackend check whether the current key group index is a 
> valid one while the RocksDBStateBackend will not. When using 
> HeapStateBackend, if the user uses a non-deterministic shuffle key, an 
> exception is thrown as follows:
>  
> {code:java}
> java.lang.IllegalArgumentException: Key group 84 is not in 
> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. Unless you're directly using 
> low level state access APIs, this is most likely caused by non-deterministic 
> shuffle key (hashCode and equals implementation).
>     at 
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
>     at 
> org.apache.flink.runtime.state.heap.StateTable.getMapForKeyGroup(StateTable.java:305)
>     at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:261)
>     at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
>     at 
> com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:169)
>     at 
> com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:160)
>     at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:526)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:811)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:760)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}
> However, the RocksDBStateBackend will run without an exception. The wrong key 
> group index will cause a state correctness problem, so it is better to do a 
> check in {_}InternalKeyContextImpl#{_}{_}setCurrentKeyGroupIndex{_}, and 
> throw an exception immediately.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Zakelly opened a new pull request, #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

2022-11-21 Thread GitBox


Zakelly opened a new pull request, #21362:
URL: https://github.com/apache/flink/pull/21362

   ## What is the purpose of the change
   
   Currently the HeapStateBackend check whether the current key group index is 
a valid one while the RocksDBStateBackend will not.
   This PR do this check in InternalKeyContextImpl#setCurrentKeyGroupIndex, to 
expose the data correctness problem immediately when using RocksDB as well as 
heap.
   
   ## Brief change log
   
   * Add the checking logic in InternalKeyContextImpl#setCurrentKeyGroupIndex.
   * Add a test for the new logic.
   * Change some test code in CoBroadcastWithKeyedOperatorTest, since it fails 
after adding the check.
   
   ## Verifying this change
   
   This change added a new test class InternalKeyContextImplTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): **yes**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30131) flink iterate will suspend when record is a bit large

2022-11-21 Thread Lu (Jira)
Lu created FLINK-30131:
--

 Summary: flink iterate will suspend when record is a bit large
 Key: FLINK-30131
 URL: https://issues.apache.org/jira/browse/FLINK-30131
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.15.2
Reporter: Lu
 Attachments: image-2022-11-22-14-59-08-272.png

 
{code:java}
//代码占位符
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8082);
configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 1000);
configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
MemorySize.parse("4g"));
configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 
1000);
configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate",
 1000);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
List list = new ArrayList<>(10);
for (int i = 1; i < 1; i++) {
list.add(i);
}
DataStreamSource integerDataStreamSource = env.fromCollection(list);
DataStream map = integerDataStreamSource.map(i -> new 
byte[1000]).setParallelism(1).name("map to byte[]").shuffle();
IterativeStream iterate = map.iterate();
DataStream map1 = iterate.process(new ProcessFunction() 
{
@Override
public void processElement(byte[] value, ProcessFunction.Context ctx, Collector out) throws Exception {
out.collect(value);
}
}).name("multi collect");
DataStream filter = map1.filter(i -> true 
).setParallelism(1).name("feedback");
iterate.closeWith(filter);
map1.map(bytes -> bytes.length).name("map to length").print();
env.execute(); {code}
my code is above.

 

when i use iterate with big record ,  the iterate will suspend at a random 
place. when i saw the stack, it has a suspicious thread

!image-2022-11-22-14-59-08-272.png|width=751,height=328!

it seems like a network related problem. so i increse the network buffer memory 
and num. but it only delay the suspend point,  it will still suspend after 
iterate a little more times than before.

i want to know if this is a bug or i have some error in my code or 
configuration.

looking forward to your reply. thanks in advance.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs

2022-11-21 Thread GitBox


snuyanzin commented on PR #17649:
URL: https://github.com/apache/flink/pull/17649#issuecomment-1323205144

   @flinkbot run azure
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30130) Table.select lose watermark

2022-11-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30130:


 Summary: Table.select lose watermark
 Key: FLINK-30130
 URL: https://issues.apache.org/jira/browse/FLINK-30130
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.1
Reporter: Yunfeng Zhou


Trying to execute the following program
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream stream = env.fromSequence(0, 1000);

Schema schema = Schema.newBuilder()
.column("f0", DataTypes.BIGINT())
.columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)")
.watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND")
.build();
Table table = tEnv.fromDataStream(stream, schema);

table.printSchema();

table = table.select($("*"));

table.printSchema();{code}

Would get the following result


{code:java}
(
  `f0` BIGINT,
  `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3),
  WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND
)
(
  `f0` BIGINT,
  `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME*
)
{code}

This result shows that the watermark property of a Table is lost during select 
operation.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-21 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1028927993


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java:
##
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import com.mongodb.internal.HexUtils;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonDocument;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.EncoderContext;
+import org.bson.json.JsonWriter;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */
+@Internal
+public class BsonToRowDataConverters {
+
+// 
-
+// Runtime Converters
+// 
-
+
+/**
+ * Runtime converter that converts {@link BsonValue} into objects of Flink 
Table & SQL internal
+ * data structures.
+ */
+@FunctionalInterface
+public interface BsonToRowDataConverter extends Serializable {
+Object convert(BsonValue bsonValue);
+}
+
+// 

+// IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+// necessary because the maven shade plugin cannot relocate classes in
+// SerializedLambdas (MSHADE-260). On the other hand we want to relocate 
Bson for
+// sql-connector uber jars.
+// 

+
+/** Creates a runtime converter which is null safe. */
+public static BsonToRowDataConverter createNullableConverter(LogicalType 
type) {
+return wrapIntoNullableInternalConverter(createConverter(type));
+}
+
+private static BsonToRowDataConverter wrapIntoNullableInternalConverter(
+BsonToRowDataConverter bsonToRowDataConverter) {
+return new BsonToRowDataConverter() {
+private static final long serialVersionUID = 1L;
+
+@Override
+public Object convert(BsonValue bsonValue) {
+if (bsonValue == null || bsonValue.isNull() || bsonValue 
instanceof BsonUndefined) {
+return null;
+}
+if (bsonValue.isDecimal128() && 
bsonValue.asDecimal128().getValue().isNaN()) {
+return null;
+}
+return bsonToRowDataConverter.convert(bsonValue);
+}
+};
+}
+
+/** Creates a runtime converter which assuming input object is not null. */
+private 

[jira] [Created] (FLINK-30129) Push projection through ChangelogNormalize

2022-11-21 Thread Jark Wu (Jira)
Jark Wu created FLINK-30129:
---

 Summary: Push projection through ChangelogNormalize
 Key: FLINK-30129
 URL: https://issues.apache.org/jira/browse/FLINK-30129
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Jark Wu


Currently, the ChangelogNormalize node is generated during the physical 
optimization phase. That means the projection is not pushed through 
ChangelogNormalize if the {{TableSource}} doesn't support 
{{SupportsProjectionPushDown}}. We can implement such optimization to reduce 
the state size (fewer columns in state value) and better throughput (only 
changes on the selected columns will be emitted). 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuchenECNU commented on pull request #21269: [FLINK-29322] Expose savepoint format on Web UI

2022-11-21 Thread GitBox


yuchenECNU commented on PR #21269:
URL: https://github.com/apache/flink/pull/21269#issuecomment-1323155334

   @zentol Hi, do you have any other comments for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] XComp commented on a diff in pull request #581: Add 1.15.3 release

2022-11-21 Thread GitBox


XComp commented on code in PR #581:
URL: https://github.com/apache/flink-web/pull/581#discussion_r1028894470


##
_posts/2022-11-10-release-1.15.3.md:
##
@@ -0,0 +1,185 @@
+---
+layout: post
+title:  "Apache Flink 1.15.3 Release Announcement"
+date: 2022-11-10T22:00:00.000Z
+categories: news
+authors:
+- fapaul:
+  name: "Fabian Paul"
+
+excerpt: The Apache Flink Community is pleased to announce a bug fix release 
for Flink 1.15.
+
+---
+
+The Apache Flink Community is pleased to announce the second bug fix release 
of the Flink 1.15 series.
+
+This release includes 59 bug fixes, vulnerability fixes, and minor 
improvements for Flink 1.15.
+Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:
+[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352210).
+
+We highly recommend all users upgrade to Flink 1.15.3.
+
+# Release Artifacts
+
+## Maven Dependencies
+
+```xml
+
+  org.apache.flink
+  flink-java
+  1.15.3
+
+
+  org.apache.flink
+  flink-streaming-java
+  1.15.3
+
+
+  org.apache.flink
+  flink-clients
+  1.15.3
+
+```
+
+## Binaries
+
+You can find the binaries on the updated [Downloads page]({{ site.baseurl 
}}/downloads.html).
+
+## Docker Images
+
+* [library/flink](https://hub.docker.com/_/flink?tab=tags=1=1.15.3) 
(official images)

Review Comment:
   ```suggestion
   * [library/flink](https://hub.docker.com/_/flink/tags?page=1=1.15.3) 
(official images)
   ```
   Did you copy this from a template or just from the 1.15.2 announcement? We 
might want to update the template as well if it exist. But I couldn't find 
anything. :thinking: 



##
_posts/2022-11-10-release-1.15.3.md:
##
@@ -0,0 +1,185 @@
+---
+layout: post
+title:  "Apache Flink 1.15.3 Release Announcement"
+date: 2022-11-10T22:00:00.000Z
+categories: news
+authors:
+- fapaul:
+  name: "Fabian Paul"
+
+excerpt: The Apache Flink Community is pleased to announce a bug fix release 
for Flink 1.15.
+
+---
+
+The Apache Flink Community is pleased to announce the second bug fix release 
of the Flink 1.15 series.
+
+This release includes 59 bug fixes, vulnerability fixes, and minor 
improvements for Flink 1.15.
+Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:
+[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352210).
+
+We highly recommend all users upgrade to Flink 1.15.3.
+
+# Release Artifacts
+
+## Maven Dependencies
+
+```xml
+
+  org.apache.flink
+  flink-java
+  1.15.3
+
+
+  org.apache.flink
+  flink-streaming-java
+  1.15.3
+
+
+  org.apache.flink
+  flink-clients
+  1.15.3
+
+```
+
+## Binaries
+
+You can find the binaries on the updated [Downloads page]({{ site.baseurl 
}}/downloads.html).
+
+## Docker Images
+
+* [library/flink](https://hub.docker.com/_/flink?tab=tags=1=1.15.3) 
(official images)
+* 
[apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.15.3) 
(ASF repository)
+
+## PyPi
+
+* [apache-flink==1.15.3](https://pypi.org/project/apache-flink/1.15.3/)
+
+# Release Notes
+
+Bug
+
+
+[FLINK-25554] - 
DistributedCacheDfsTest.testSubmittingJobViaRestClusterClient fails on AZP
+
+[FLINK-26726] - 
Remove the unregistered  task from readersAwaitingSplit
+
+[FLINK-26890] - 
DynamoDB consumer error consuming partitions close to retention
+
+[FLINK-27384] - 
In the Hive dimension table, when the data is changed on the original 
partition, the create_time configuration does not take effect
+
+[FLINK-27400] - 
Pulsar connector subscribed the system topic when using the regex
+
+[FLINK-27415] - 
Read empty csv file throws exception in FileSystem table connector
+
+[FLINK-27492] - 
Flink table scala example does not including the scala-api jars
+
+[FLINK-27579] - 
The param client.timeout can not be set by dynamic properties when stopping 
the job 
+
+[FLINK-27611] - 
ConcurrentModificationException during Flink-Pulsar checkpoint notification
+
+[FLINK-27954] - 
JobVertexFlameGraphHandler does not work on standby Dispatcher
+
+[FLINK-28084] - 
Pulsar unordered reader should disable retry and delete reconsume logic.
+
+[FLINK-28265] - 
Inconsistency in Kubernetes HA service: broken state handle
+
+[FLINK-28488] - 
KafkaMetricWrapper does incorrect cast
+
+[FLINK-28609] - 
Flink-Pulsar connector fails on larger schemas
+
+[FLINK-28863] - 
Snapshot result of RocksDB native savepoint should have empty shared-state
+
+[FLINK-28934] - 
Pulsar Source put all the splits to only one parallelism when using 
Exclusive subscription
+
+[FLINK-28951] - 
Header in janino generated java files can merge with line numbers
+
+[FLINK-28959] - 

[jira] [Updated] (FLINK-27846) Schema evolution for data file reading

2022-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27846:
---
Labels: pull-request-available  (was: )

> Schema evolution for data file reading
> --
>
> Key: FLINK-27846
> URL: https://issues.apache.org/jira/browse/FLINK-27846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Shammon
>Priority: Minor
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> For file reads, we need to.
> - Adjust the correspondence of specific fields
> - If there is a type evolution, we need to upgrade the corresponding data



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] zjureel opened a new pull request, #396: [FLINK-27846] Schema evolution for reading data file

2022-11-21 Thread GitBox


zjureel opened a new pull request, #396:
URL: https://github.com/apache/flink-table-store/pull/396

   Currently, the table store uses the latest schema id to read the data file 
meta. When the schema evolves, it will cause errors, for example:
   
   the schema of underlying data is [1->a, 2->b, 3->c, 4->d] and schema id is 
0, where 1/2/3/4 is field id and a/b/c/d is field name
   After schema evolution, schema id is 1, and the new schema is [1->a, 3->c, 
5->f, 6->b, 7->g]
   When table store reads underlying data file, it should use schema 0 with 
should [1->a, 2->b, 3->c, 4->d], and mapping schema 1 to 0 according to their 
field ids.
   
   This PR will read the data according to the schema id from the 
avro/orc/parquet data file, then create index mapping from the table schema and 
the underlying data schema, so that the table store can read the correct row 
data through its latest schema.
   
   The main codes are as follows:
   
   1. Added method `valueFields` in `KeyValueFieldsExtractor` to extract fields 
from `TableSchema`
   2. Added `AbstractFileRecordIterator` for `KeyValueDataFileRecordIterator` 
and `RowDataFileRecordIterator` to create projected row data from table schema 
to underlying row data
   3. Added methods in `SchemaEvolutionUtil` to create index mapping between 
schemas, convert projection from table to underlying data
   4. Added `BulkFormatMapping` to create reader factory and index mapping for 
`KeyValueFileReaderFactory` and `AppendOnlyFileStoreRead`
   
   The main tests include:
   
   1. Updated `SchemaEvolutionUtilTest` to create index mapping and convert 
projection
   2. Added `AppendOnlyFileDataTableTest`, 
`ChangelogValueCountFileDataTableTest` and `ChangelogWithKeyFileDataTableTest` 
to read and filter data after schema evolution
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-21317) Downstream keyed state not work after FlinkKafkaShuffle

2022-11-21 Thread Yuan Mei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuan Mei closed FLINK-21317.

Fix Version/s: 1.17.0
   Resolution: Duplicate

is resolved in FLINK-29437

> Downstream keyed state not work after FlinkKafkaShuffle
> ---
>
> Key: FLINK-21317
> URL: https://issues.apache.org/jira/browse/FLINK-21317
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Kezhu Wang
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.17.0
>
>
> {{FlinkKafkaShuffle}} uses 
> {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition 
> records to kafka topic partition. The assignment works as follow:
>  # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int 
> maxParallelism)}} assigns key to key group.
>  # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int 
> maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to 
> operator/subtask index.
> When kafka topic partitions are consumed, they are redistributed by 
> {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int 
> numParallelSubtasks)}}. I copied code of this redistribution here.
> {code:java}
> public class KafkaTopicPartitionAssigner {
> public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
> int startIndex =
> ((partition.getTopic().hashCode() * 31) & 0x7FFF) % 
> numParallelSubtasks;
> // here, the assumption is that the id of Kafka partitions are always 
> ascending
> // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
> // start index
> return (startIndex + partition.getPartition()) % numParallelSubtasks;
> }
> }
> {code}
> This partition redistribution breaks prerequisites for 
> {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed 
> up. The consequence is unusable keyed state. I list deepest stack trace 
> captured here:
> {noformat}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
>   at 
> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
> {noformat}
> cc [~ym]  [~sewen] [~AHeise]  [~pnowojski]
> Below is my proposed changes:
> * Make assignment between partition and subtask customizable.
> * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 
> in existing assignment algorithms.)
> I saw FLINK-8570, above changes could be helpful if we finally decide to 
> deliver FLINK-8570.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

2022-11-21 Thread ramkrishna.s.vasudevan (Jira)
ramkrishna.s.vasudevan created FLINK-30128:
--

 Summary: Introduce Azure Data Lake Gen2 APIs in the Hadoop 
Recoverable path
 Key: FLINK-30128
 URL: https://issues.apache.org/jira/browse/FLINK-30128
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.13.1
Reporter: ramkrishna.s.vasudevan


Currently the HadoopRecoverableWriter assumes that the underlying FS is Hadoop 
and so it checks for DistributedFileSystem. It also tries to do a truncate and 
ensure the lease is recovered before the 'rename' operation is done.
In the Azure Data lake gen 2 world, the driver does not support truncate and 
lease recovery API. We should be able to get the last committed size and if it 
matches go for the rename. Will be back with more details here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30127) Correct the packaging of HadoopRecoverableWriter and related issues

2022-11-21 Thread ramkrishna.s.vasudevan (Jira)
ramkrishna.s.vasudevan created FLINK-30127:
--

 Summary: Correct the packaging of HadoopRecoverableWriter and 
related issues
 Key: FLINK-30127
 URL: https://issues.apache.org/jira/browse/FLINK-30127
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.13.1
Reporter: ramkrishna.s.vasudevan


The first issue here is that the HadoopRecoverableWriter that creates the 
RecoverableWriter classes are not found in the azure-fs package. We need to fix 
them so that we can use the hadoop-common libraries to create the ABFS wrappers 
for the Streaming sink. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18568) Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink

2022-11-21 Thread ramkrishna.s.vasudevan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636992#comment-17636992
 ] 

ramkrishna.s.vasudevan commented on FLINK-18568:


[~jinq0123] Yes. I am working on it. Pls allow this JIRa to be in my name . We 
are working towards this and share PRs. Before that I plan to raise some 
subtasks under this.  

> Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink
> --
>
> Key: FLINK-18568
> URL: https://issues.apache.org/jira/browse/FLINK-18568
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Israel Ekpo
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.17.0
>
>
> The objective of this improvement is to add support for Azure Data Lake Store 
> Gen 2 (ADLS Gen2) [2] as one of the supported filesystems for the Streaming 
> File Sink [1]
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
> [2] https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 merged pull request #178: [FLINK-30099] Add test case for algorithms' python APIs

2022-11-21 Thread GitBox


lindong28 merged PR #178:
URL: https://github.com/apache/flink-ml/pull/178


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on pull request #178: [FLINK-30099] Add test case for algorithms' python APIs

2022-11-21 Thread GitBox


lindong28 commented on PR #178:
URL: https://github.com/apache/flink-ml/pull/178#issuecomment-1323016115

   Thanks for the PR. Looks pretty good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30126) Delay registration of the catalog, register the catalog as needed

2022-11-21 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-30126:
--
Description: Data platform has registered many relational database data 
sources such as mysql, data source code is used as the catalog name, we are not 
sure which data source needs to register the catalog in flink, we hope that the 
required catalog can be dynamically loaded when sql is executed, flink provides 
the interface. Users can customize the registration catalog  (was: Data 
platform has registered many relational database data sources such as mysql, 
data source code is used as the catalog name, we are not sure which data source 
needs to register the catalog in flink, we hope that the required catalog can 
be dynamically loaded when sql is executed, flink provides the interface. Users 
can customize the register time catalog)

> Delay registration of the catalog, register the catalog as needed
> -
>
> Key: FLINK-30126
> URL: https://issues.apache.org/jira/browse/FLINK-30126
> Project: Flink
>  Issue Type: New Feature
>Reporter: melin
>Priority: Major
>
> Data platform has registered many relational database data sources such as 
> mysql, data source code is used as the catalog name, we are not sure which 
> data source needs to register the catalog in flink, we hope that the required 
> catalog can be dynamically loaded when sql is executed, flink provides the 
> interface. Users can customize the registration catalog



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30126) Delay registration of the catalog, register the catalog as needed

2022-11-21 Thread melin (Jira)
melin created FLINK-30126:
-

 Summary: Delay registration of the catalog, register the catalog 
as needed
 Key: FLINK-30126
 URL: https://issues.apache.org/jira/browse/FLINK-30126
 Project: Flink
  Issue Type: New Feature
Reporter: melin


Data platform has registered many relational database data sources such as 
mysql, data source code is used as the catalog name, we are not sure which data 
source needs to register the catalog in flink, we hope that the required 
catalog can be dynamically loaded when sql is executed, flink provides the 
interface. Users can customize the register time catalog



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30116) Don't Show Env Vars in Web UI

2022-11-21 Thread ConradJam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636984#comment-17636984
 ] 

ConradJam commented on FLINK-30116:
---

Hi [~knaufk] Can I take this ticket?

> Don't Show Env Vars in Web UI
> -
>
> Key: FLINK-30116
> URL: https://issues.apache.org/jira/browse/FLINK-30116
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.0
>Reporter: Konstantin Knauf
>Priority: Critical
> Fix For: 1.16.1
>
>
> As discussed and agreed upon in [1], we'd like to revert [2] and not show any 
> environment variables in the Web UI for security reasons. 
> [1] https://lists.apache.org/thread/rjgk15bqttvblp60zry4n5pw4xjw7q9k 
> [2] https://issues.apache.org/jira/browse/FLINK-28311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #174: [FLINK-29604] Add Estimator and Transformer for CountVectorizer

2022-11-21 Thread GitBox


jiangxin369 commented on code in PR #174:
URL: https://github.com/apache/flink-ml/pull/174#discussion_r1028792920


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * {@link CountVectorizer} aims to help convert a collection of text documents 
to vectors of token
+ * counts. When an a-priori dictionary is not available, {@link 
CountVectorizer} can be used as an
+ * estimator to extract the vocabulary, and generates a {@link 
CountVectorizerModel}. The model
+ * produces sparse representations for the documents over the vocabulary, 
which can then be passed
+ * to other algorithms like LDA.

Review Comment:
   * The JavaDoc often provides a description of the Estimator and starts with 
"An Estimator which", while the markdown starts with "CountVectorizer is an 
algorithm that ".
   * I'll remove the `@link`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30125) Projection pushdown is not work for partial update

2022-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30125:
---
Labels: pull-request-available  (was: )

> Projection pushdown is not work for partial update
> --
>
> Key: FLINK-30125
> URL: https://issues.apache.org/jira/browse/FLINK-30125
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0, table-store-0.2.2
>
>
> We did not properly process the project in MergeFunction, which resulted in 
> subsequent reading position errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi opened a new pull request, #395: [FLINK-30125] Projection pushdown is not work for partial update

2022-11-21 Thread GitBox


JingsongLi opened a new pull request, #395:
URL: https://github.com/apache/flink-table-store/pull/395

   We did not properly process the project in MergeFunction, which resulted in 
subsequent reading position errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30125) Projection pushdown is not work for partial update

2022-11-21 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-30125:
-
Fix Version/s: table-store-0.2.2

> Projection pushdown is not work for partial update
> --
>
> Key: FLINK-30125
> URL: https://issues.apache.org/jira/browse/FLINK-30125
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: table-store-0.3.0, table-store-0.2.2
>
>
> We did not properly process the project in MergeFunction, which resulted in 
> subsequent reading position errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-21 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1028782035


##
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/splitter/MongoShardedSplitterTest.java:
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.splitter;
+
+import org.apache.flink.connector.mongodb.common.utils.MongoUtils;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoShardedSplitter;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitContext;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoClient;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
+import org.bson.BsonString;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.AVG_OBJ_SIZE_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.COUNT_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.DROPPED_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.SHARD_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.SIZE_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.UUID_FIELD;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+
+/** Unit tests for {@link MongoShardedSplitter}. */
+@ExtendWith(TestLoggerExtension.class)
+public class MongoShardedSplitterTest {
+
+@Mock private MongoClient mongoClient;
+
+@BeforeEach
+public void setUp() {
+MockitoAnnotations.initMocks(this);

Review Comment:
   Hi @zentol,
   
   The [MongoDB test 
container](https://www.testcontainers.org/modules/databases/mongodb/) only 
supports [replica 
set](https://www.mongodb.com/docs/manual/replication/#replication-in-mongodb) 
mode, while the sharded split strategy requires MongoDB to run in [sharded 
cluster](https://www.mongodb.com/docs/manual/sharding/#sharded-cluster) mode.  
So here mocks some result of 
[config.collections](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.collections)
 and 
[config.chunks](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.chunks)
  to simulate the scenario of sharded cluster.
   
   Also, I'm a bit confused about these functional wrappers, using them we 
might still need to mock their results. Is there something wrong with my 
understanding?
   
   BTW, We can also do some extra work to start 3 mongo containers and have 
them run in sharded mode. Do we need to take this approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, 

[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #174: [FLINK-29604] Add Estimator and Transformer for CountVectorizer

2022-11-21 Thread GitBox


jiangxin369 commented on code in PR #174:
URL: https://github.com/apache/flink-ml/pull/174#discussion_r1028782050


##
docs/content/docs/operators/feature/countvectorizer.md:
##
@@ -0,0 +1,182 @@
+---
+title: "Count Vectorizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/countvectorizer.html
+---
+
+
+
+## Count Vectorizer
+
+CountVectorizer aims to help convert a collection of text documents to
+vectors of token counts. When an a-priori dictionary is not available,
+CountVectorizer can be used as an estimator to extract the vocabulary,
+and generates a CountVectorizerModel. The model produces sparse
+representations for the documents over the vocabulary, which can then
+be passed to other algorithms like LDA.
+
+### Input Columns
+
+| Param name | Type | Default   | Description |
+|:---|:-|:--|:|
+| inputCol   | String[] | `"input"` | Input string array. |
+
+### Output Columns
+
+| Param name | Type | Default| Description |
+|:---|:-|:---|:|
+| outputCol  | SparseVector | `"output"` | Vector of token counts. |
+
+### Parameters
+
+Below are the parameters required by `CountVectorizerModel`.
+
+| Key| Default| Type| Required | Description   



  |
+|||-|--|-|
+| inputCol   | `"input"`  | String  | no   | Input column name.



  |
+| outputCol  | `"output"` | String  | no   | Output column name.   



  |
+| minTF  | `1.0`  | Double  | no   | Filter to ignore rare words 
in a document. For each document, terms with frequency/count less than the 
given threshold are ignored. If this is an integer >= 1, then this specifies a 
count (of times the term must appear in the document); if this is a double in 
[0,1), then this specifies a fraction (out of the document's token count).  |

Review Comment:
   With the current expression, users are recommended to set this param to an 
integer if specifies the count, which makes the meaning of this parameter clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30125) Projection pushdown is not work for partial update

2022-11-21 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30125:


 Summary: Projection pushdown is not work for partial update
 Key: FLINK-30125
 URL: https://issues.apache.org/jira/browse/FLINK-30125
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.3.0


We did not properly process the project in MergeFunction, which resulted in 
subsequent reading position errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30123) Optimize ApplicationDeployer API design

2022-11-21 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-30123:
--
Description: 
The task is submitted by ApplicationDeployer api, and the run is synchronous 
and waiting for the submission to be completed. If the task is submitted to 
yarn, it is probably accepted and the yarn applicationID is not obtained at 
this time. It is difficult to cancel the task.Recommended to org. apache. 
spark.launcher.SparkLauncher design, asynchronous submission tasks, can obtain 
applicationId as soon as possible, if you want to delete the task ahead of 
time, direct yarn application - kill XXX;

!image-2022-11-22-09-28-38-631.png!

!image-2022-11-22-09-28-54-660.png!

  was:
采用 ApplicationDeployer api 提交任务,run是同步等待提交完成,如果任务提交yarn,可能一直都是accepted 
状态,这个时候没有获取yarn applicationID,如果想取消任务是比较难的,建议参考 
org.apache.spark.launcher.SparkLauncher  
设计,异步提交任务,能够尽快获取applicationId,如果想提前删掉任务,直接yarn application -kill xxx;

!image-2022-11-22-09-28-38-631.png!

!image-2022-11-22-09-28-54-660.png!


> Optimize ApplicationDeployer API design
> ---
>
> Key: FLINK-30123
> URL: https://issues.apache.org/jira/browse/FLINK-30123
> Project: Flink
>  Issue Type: New Feature
>Reporter: melin
>Priority: Major
> Attachments: image-2022-11-22-09-28-38-631.png, 
> image-2022-11-22-09-28-54-660.png
>
>
> The task is submitted by ApplicationDeployer api, and the run is synchronous 
> and waiting for the submission to be completed. If the task is submitted to 
> yarn, it is probably accepted and the yarn applicationID is not obtained at 
> this time. It is difficult to cancel the task.Recommended to org. apache. 
> spark.launcher.SparkLauncher design, asynchronous submission tasks, can 
> obtain applicationId as soon as possible, if you want to delete the task 
> ahead of time, direct yarn application - kill XXX;
> !image-2022-11-22-09-28-38-631.png!
> !image-2022-11-22-09-28-54-660.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30082) Enable write-buffer-spillable by default only for object storage

2022-11-21 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-30082.

Resolution: Fixed

master: 039f2ff5d2e25154b5ab4b04f4c36086ce51d9c3

> Enable write-buffer-spillable by default only for object storage
> 
>
> Key: FLINK-30082
> URL: https://issues.apache.org/jira/browse/FLINK-30082
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> After a lot of tests, it is found that the participation of spillable does 
> not improve HDFS greatly, but will bring some jitters.
> In this jira, spillable is enabled only when the object is stored by default, 
> so that the performance can be improved without affecting hdfs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #389: [FLINK-30082] Enable write-buffer-spillable by default only for object storage

2022-11-21 Thread GitBox


JingsongLi merged PR #389:
URL: https://github.com/apache/flink-table-store/pull/389


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30099) Add tests to cover model data APIs for all existing algorithms

2022-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30099:
---
Labels: pull-request-available  (was: )

> Add tests to cover model data APIs for all existing algorithms
> --
>
> Key: FLINK-30099
> URL: https://issues.apache.org/jira/browse/FLINK-30099
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> test_linearsvc.py should be updated to cover the get_model_data() and 
> set_model_data() usage. Same for other existing algorithms in Flink ML.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] yunfengzhou-hub opened a new pull request, #178: [FLINK-30099] Add test case for algorithms' python APIs

2022-11-21 Thread GitBox


yunfengzhou-hub opened a new pull request, #178:
URL: https://github.com/apache/flink-ml/pull/178

   ## What is the purpose of the change
   
   This PR adds python tests for the public APIs of Flink ML algorithms.
   
   ## Brief change log
   
   Adds test cases for the following APIs for all existing offline algorithms 
in Flink ML. The tests added in this PR are as follows.
   
   | Name of Algorithm | setModelData() | getModelData()  | Save/load |
   | - | -- | --- | - |
   | KNN   | O  | O   | --|
   | LinearSVC | O  | O   | O |
   | LogisticRegression| -- | --  | --|
   | NaiveBayes| O  | X (FLINK-30124) | --|
   | AgglomerativeClustering   | -  | -   | --|
   | KMeans| O  | X (FLINK-30122) | --|
   | BinaryClassification  | -  | -   | --|
   | Binarizer | -  | -   | --|
   | Bucketizer| -  | -   | --|
   | DCT   | -  | -   | --|
   | ELementWiseProduct| -  | -   | --|
   | FeatureHasher | -  | -   | --|
   | HashingTF | -  | -   | --|
   | IDF   | O  | X (FLINK-29477) | --|
   | Imputer   | O  | X (FLINK-30124) | --|
   | Interaction   | -  | -   | --|
   | KbinsDiscretizer  | O  | X (FLINK-30122) | --|
   | MaxAbsScaler  | O  | O   | O |
   | MinMaxScaler  | O  | O   | O |
   | NGram | -  | -   | --|
   | Normalizer| -  | -   | --|
   | OneHotEncoder | O  | O   | O |
   | PolynomialExpansion   | -  | -   | --|
   | RandomSplitter| -  | -   | --|
   | RegexTokenizer| -  | -   | --|
   | RobustScaler  | O  | O   | --|
   | StandardScaler| O  | O   | O |
   | StringIndexer | O  | X (FLINK-30122) | O |
   | IndexToStringModel| -  | X (FLINK-30122) | O |
   | Tokenizer | -  | -   | --|
   | VarianceThresholdSelector | O  | X (FLINK-29477) | --|
   | VectorAssembler   | -  | -   | --|
   | VectorIndexer | O  | X (FLINK-30124) | --|
   | VectorSlicer  | -  | -   | --|
   | LinearRegression  | -- | --  | --|
   | ChiSqTest | -  | -   | --|
   
   The marks in the table above have the following meanings:
   
   - `-`: The algorithm does not need to test this API. For example, a 
`Transformer` or `AlgoOperator` do not need to test `getModelData()` and 
`setModelData()`.
   - `--`: The algorithm needs to test this API, and the test has already been 
added in previous commits.
   - `O`: The algorithm needs to test this API, and the test is added in this 
PR.
   - `X`: The algorithm needs to test this API, but this API fails with an 
exception. The exceptions have all been recorded by Jira tickets as in the 
parentheses.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-19103) The PushPartitionIntoTableSourceScanRule will lead a performance problem when there are still many partitions after pruning

2022-11-21 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-19103.
--
Resolution: Fixed

> The PushPartitionIntoTableSourceScanRule will lead a performance problem when 
> there are still many partitions after pruning
> ---
>
> Key: FLINK-19103
> URL: https://issues.apache.org/jira/browse/FLINK-19103
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.10.2, 1.11.1
>Reporter: fa zheng
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.17.0
>
>
> The PushPartitionIntoTableSourceScanRule will obtain new statistic after 
> pruning, however, it uses a for loop to get statistics of each partitions and 
> then merge them together. During this process, flink will try to call 
> metastore's interface four times in one loop. When remaining partitions are 
> huge, it spends a lot of time to get new statistic. 
>  
> {code:scala}
> val newStatistic = {
>   val tableStats = catalogOption match {
> case Some(catalog) =>
>   def mergePartitionStats(): TableStats = {
> var stats: TableStats = null
> for (p <- remainingPartitions) {
>   getPartitionStats(catalog, tableIdentifier, p) match {
> case Some(currStats) =>
>   if (stats == null) {
> stats = currStats
>   } else {
> stats = stats.merge(currStats)
>   }
> case None => return null
>   }
> }
> stats
>   }
>   mergePartitionStats()
> case None => null
>   }
>   
> FlinkStatistic.builder().statistic(statistic).tableStats(tableStats).build()
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30101) Always use StandaloneClientHAServices to create RestClusterClient when retriving a Flink on YARN cluster client

2022-11-21 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636970#comment-17636970
 ] 

Xintong Song commented on FLINK-30101:
--

I'm not sure about the proposed changes. {{StandaloneClientHAServices}} and 
{{StandaloneLeaderRetrievalService}} assumes there's only one contender, which 
should always be the leader. There's no such guarantee when running a Yarn 
deployment. It is possible that the leadership changes after getting the 
application report, and ZK HA makes sure the rest client always connects to the 
latest leader address in such cases.

For short sql jobs, you may want to consider sql-gateway, which does not fetch 
leader address for every submitted job. Unfortunately, there's no such thing 
for DataStream / Table API jobs. Besides, you may also consider a non-HA 
cluster, if the end-to-end latency is cared mostly.

> Always use StandaloneClientHAServices to create RestClusterClient when 
> retriving a Flink on YARN cluster client 
> 
>
> Key: FLINK-30101
> URL: https://issues.apache.org/jira/browse/FLINK-30101
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission
>Affects Versions: 1.16.0
>Reporter: Zhanghao Chen
>Priority: Major
> Fix For: 1.17.0
>
>
> *Problem*
> Currently, the procedure of retrieving a Flink on YARN cluster client is as 
> follows (in YarnClusterDescriptor#retrieve method):
>  # Get application report from YARN
>  # Set rest.address & rest.port using the info from application report
>  # Create a new RestClusterClient using the updated configuration, will use 
> client HA serivce to fetch the rest.address & rest.port if HA is enabled
> Here, we can see that the usage of client HA in step 3 is redundant, as we've 
> already got the rest.address & rest.port from YARN application report. When 
> ZK HA is enabled, this would take ~1.5 s to initialize client HA services and 
> fetch the rest IP & port. 
> 1.5 s can mean a lot for latency-sensitive client operations.  In my company, 
> we use Flink client to submit short-running session jobs and e2e latency is 
> critical. The job submission time is around 10 s on average, and 1.5s would 
> mean a 15% time saving. 
> *Proposal*
> When retrieving a Flink on YARN cluster client, use 
> StandaloneClientHAServices to
> create RestClusterClient instead as we have pre-fetched rest.address & 
> rest.port from YARN application report. This is also what we did in 
> KubernetesClusterDescriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29232) File store continuous reading support from_timestamp scan mode

2022-11-21 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29232.

Resolution: Fixed

master: 0d2440984f6ca9c4e26caca7034280e9c3726337

> File store continuous reading support from_timestamp scan mode
> --
>
> Key: FLINK-29232
> URL: https://issues.apache.org/jira/browse/FLINK-29232
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> The file store can find a suitable snapshot according to start timestamp and 
> read incremental data from it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

2022-11-21 Thread GitBox


JingsongLi merged PR #388:
URL: https://github.com/apache/flink-table-store/pull/388


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30123) Optimize ApplicationDeployer API design

2022-11-21 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636960#comment-17636960
 ] 

luoyuxia commented on FLINK-30123:
--

[~melin] Thanks for reaching here.  But could you please translate it to 
English? All issues in Flink should be written in English, otherwise, it'll be 
closed.

> Optimize ApplicationDeployer API design
> ---
>
> Key: FLINK-30123
> URL: https://issues.apache.org/jira/browse/FLINK-30123
> Project: Flink
>  Issue Type: New Feature
>Reporter: melin
>Priority: Major
> Attachments: image-2022-11-22-09-28-38-631.png, 
> image-2022-11-22-09-28-54-660.png
>
>
> 采用 ApplicationDeployer api 提交任务,run是同步等待提交完成,如果任务提交yarn,可能一直都是accepted 
> 状态,这个时候没有获取yarn applicationID,如果想取消任务是比较难的,建议参考 
> org.apache.spark.launcher.SparkLauncher  
> 设计,异步提交任务,能够尽快获取applicationId,如果想提前删掉任务,直接yarn application -kill xxx;
> !image-2022-11-22-09-28-38-631.png!
> !image-2022-11-22-09-28-54-660.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30124) GenericType is not supported in PyFlink currently

2022-11-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30124:


 Summary: GenericType is not supported in PyFlink 
currently
 Key: FLINK-30124
 URL: https://issues.apache.org/jira/browse/FLINK-30124
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Yunfeng Zhou


When we add and execute the following test case to 
flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py of the 
Flink ML repository,
{code:java}
def test_get_model_data(self):
model_data = self.estimator.fit(self.train_data).get_model_data()[0]
self.t_env.to_data_stream(model_data).execute_and_collect().next(){code}
The following exception would be thrown.

 
{code:java}
j_type_info = JavaObject id=o698
    def _from_java_type(j_type_info: JavaObject) -> TypeInformation:
        gateway = get_gateway()
        JBasicTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
    
        if _is_instance_of(j_type_info, JBasicTypeInfo.STRING_TYPE_INFO):
            return Types.STRING()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BOOLEAN_TYPE_INFO):
            return Types.BOOLEAN()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BYTE_TYPE_INFO):
            return Types.BYTE()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.SHORT_TYPE_INFO):
            return Types.SHORT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.INT_TYPE_INFO):
            return Types.INT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.LONG_TYPE_INFO):
            return Types.LONG()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.FLOAT_TYPE_INFO):
            return Types.FLOAT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.DOUBLE_TYPE_INFO):
            return Types.DOUBLE()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.CHAR_TYPE_INFO):
            return Types.CHAR()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_INT_TYPE_INFO):
            return Types.BIG_INT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_DEC_TYPE_INFO):
            return Types.BIG_DEC()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.INSTANT_TYPE_INFO):
            return Types.INSTANT()
    
        JSqlTimeTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
        if _is_instance_of(j_type_info, JSqlTimeTypeInfo.DATE):
            return Types.SQL_DATE()
        elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIME):
            return Types.SQL_TIME()
        elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIMESTAMP):
            return Types.SQL_TIMESTAMP()
    
        JPrimitiveArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo \
            .PrimitiveArrayTypeInfo
    
        if _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.BOOLEAN())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.BYTE())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.SHORT())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.INT())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.LONG())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.FLOAT())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.DOUBLE())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.CHAR())
    
        JBasicArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
    
        if _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.BOOLEAN())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.BYTE())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.SHORT())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.INT())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.LONG())
        elif _is_instance_of(j_type_info, 

[jira] [Created] (FLINK-30123) Optimize ApplicationDeployer API design

2022-11-21 Thread melin (Jira)
melin created FLINK-30123:
-

 Summary: Optimize ApplicationDeployer API design
 Key: FLINK-30123
 URL: https://issues.apache.org/jira/browse/FLINK-30123
 Project: Flink
  Issue Type: New Feature
Reporter: melin
 Attachments: image-2022-11-22-09-28-38-631.png, 
image-2022-11-22-09-28-54-660.png

采用 ApplicationDeployer api 提交任务,run是同步等待提交完成,如果任务提交yarn,可能一直都是accepted 
状态,这个时候没有获取yarn applicationID,如果想取消任务是比较难的,建议参考 
org.apache.spark.launcher.SparkLauncher  
设计,异步提交任务,能够尽快获取applicationId,如果想提前删掉任务,直接yarn application -kill xxx;

!image-2022-11-22-09-28-38-631.png!

!image-2022-11-22-09-28-54-660.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30122) Flink ML KMeans getting model data throws TypeError

2022-11-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30122:


 Summary: Flink ML KMeans getting model data throws TypeError
 Key: FLINK-30122
 URL: https://issues.apache.org/jira/browse/FLINK-30122
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


When the following test case is added to 
flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py,
{code:java}
def test_get_model_data(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model = kmeans.fit(self.data_table)
model_data = model.get_model_data()[0]
expected_field_names = ['centroids', 'weights']
self.assertEqual(expected_field_names, 
model_data.get_schema().get_field_names())self.t_env.to_data_stream(model_data).execute_and_collect().next(){code}
The following exception would be thrown.
{code:java}
data = 0, field_type = DenseVectorTypeInfo
def pickled_bytes_to_python_converter(data, field_type):
if isinstance(field_type, RowTypeInfo):
row_kind = RowKind(int.from_bytes(data[0], 'little'))
data = zip(list(data[1:]), field_type.get_field_types())
fields = []
for d, d_type in data:
fields.append(pickled_bytes_to_python_converter(d, d_type))
row = Row.of_kind(row_kind, *fields)
return row
else:
> data = pickle.loads(data)
E TypeError: a bytes-like object is required, not 'int'{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30121) Support Stored procedures

2022-11-21 Thread melin (Jira)
melin created FLINK-30121:
-

 Summary: Support Stored procedures
 Key: FLINK-30121
 URL: https://issues.apache.org/jira/browse/FLINK-30121
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: melin


Supports operations like hudi/iceberg calls, such as savepoint
[https://hudi.apache.org/docs/procedures/]
 
CALL system.procedure_name(arg_1, arg_2, ... arg_n)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #21300: [FLINK-29423][rest] Remove custom JobDetails serializer

2022-11-21 Thread GitBox


XComp commented on code in PR #21300:
URL: https://github.com/apache/flink/pull/21300#discussion_r1028642489


##
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##
@@ -93,6 +92,29 @@ public class JobDetails implements Serializable {
  */
 private final Map> 
currentExecutionAttempts;
 
+@JsonCreator
+public JobDetails(
+@JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using = 
JobIDDeserializer.class)
+JobID jobId,
+@JsonProperty(FIELD_NAME_JOB_NAME) String jobName,
+@JsonProperty(FIELD_NAME_START_TIME) long startTime,
+@JsonProperty(FIELD_NAME_END_TIME) long endTime,
+@JsonProperty(FIELD_NAME_DURATION) long duration,
+@JsonProperty(FIELD_NAME_STATUS) JobStatus status,
+@JsonProperty(FIELD_NAME_LAST_MODIFICATION) long lastUpdateTime,
+@JsonProperty(FIELD_NAME_TASKS) Map taskInfo) {
+this(
+jobId,
+jobName,
+startTime,
+endTime,
+duration,
+status,
+lastUpdateTime,
+extractNumTasksPerState(taskInfo),
+taskInfo.get(FIELD_NAME_TOTAL_NUMBER_TASKS));
+}
+
 @VisibleForTesting

Review Comment:
   actual, you're right. I didn't think this through. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30120) Flink Statefun Task Failure causes restart of all tasks

2022-11-21 Thread Stephan Weinwurm (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Weinwurm updated FLINK-30120:
-
Description: 
Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the _region_ failover strategy requires all tasks to 
be restarted so we see this in the logs:

 
{code:java}
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. {code}
 

 

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks?

Thanks in advance!
Stephan

  was:
Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the _region_ failover strategy requires all tasks to 
be restarted so we see this in the logs:

 

Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
{code:java}
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. {code}
 

 

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks?

Thanks in advance!
Stephan


> Flink Statefun Task Failure causes restart of all tasks
> ---
>
> Key: FLINK-30120
> URL: https://issues.apache.org/jira/browse/FLINK-30120
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
>Reporter: Stephan Weinwurm
>Priority: Major
>
> Hey all,
> We've noticed that a single task failure causes all of the Statefun tasks to 
> be restarted.
> For example, a single task fails because of some Statefun Endpoint 
> unavailability or if one of our Kuberentes TaskManager pods go down. 
> Flink then determines that the _region_ failover strategy requires all tasks 
> to be restarted so we see this in the logs:
>  
> {code:java}
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11.
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 5650 tasks should be restarted to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11. {code}
>  
>  
> Our tasks are all fully independent so I would like that only the one failed 
> task to get restarted or moved to a different TaskManager slot.
> Is there a way to tell Flink to only restart the failed task? Or is there a 
> specific reason why the region failover strategy decides to restart all tasks?
> Thanks in advance!
> Stephan



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30120) Flink Statefun Task Failure causes restart of all tasks

2022-11-21 Thread Stephan Weinwurm (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Weinwurm updated FLINK-30120:
-
Description: 
Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the _region_ failover strategy requires all tasks to 
be restarted so we see this in the logs:

 
{code:java}
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. {code}
 

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks?

Thanks in advance!
Stephan

  was:
Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the _region_ failover strategy requires all tasks to 
be restarted so we see this in the logs:

 
{code:java}
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. {code}
 

 

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks?

Thanks in advance!
Stephan


> Flink Statefun Task Failure causes restart of all tasks
> ---
>
> Key: FLINK-30120
> URL: https://issues.apache.org/jira/browse/FLINK-30120
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
>Reporter: Stephan Weinwurm
>Priority: Major
>
> Hey all,
> We've noticed that a single task failure causes all of the Statefun tasks to 
> be restarted.
> For example, a single task fails because of some Statefun Endpoint 
> unavailability or if one of our Kuberentes TaskManager pods go down. 
> Flink then determines that the _region_ failover strategy requires all tasks 
> to be restarted so we see this in the logs:
>  
> {code:java}
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11.
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 5650 tasks should be restarted to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11. {code}
>  
> Our tasks are all fully independent so I would like that only the one failed 
> task to get restarted or moved to a different TaskManager slot.
> Is there a way to tell Flink to only restart the failed task? Or is there a 
> specific reason why the region failover strategy decides to restart all tasks?
> Thanks in advance!
> Stephan



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30120) Flink Statefun Task Failure causes Restart of all Tasks with Regional Failover Strategy

2022-11-21 Thread Stephan Weinwurm (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Weinwurm updated FLINK-30120:
-
Summary: Flink Statefun Task Failure causes Restart of all Tasks with 
Regional Failover Strategy  (was: Flink Statefun Task Failure causes restart of 
all tasks with regional failover strategy)

> Flink Statefun Task Failure causes Restart of all Tasks with Regional 
> Failover Strategy
> ---
>
> Key: FLINK-30120
> URL: https://issues.apache.org/jira/browse/FLINK-30120
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
>Reporter: Stephan Weinwurm
>Priority: Major
>
> Hey all,
> We've noticed that a single task failure causes all of the Statefun tasks to 
> be restarted.
> For example, a single task fails because of some Statefun Endpoint 
> unavailability or if one of our Kuberentes TaskManager pods go down. 
> Flink then determines that the _region_ failover strategy requires all tasks 
> to be restarted so we see this in the logs:
>  
> {code:java}
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11.
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 5650 tasks should be restarted to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11. {code}
>  
> Our tasks are all fully independent so I would like that only the one failed 
> task to get restarted or moved to a different TaskManager slot.
> Is there a way to tell Flink to only restart the failed task? Or is there a 
> specific reason why the region failover strategy decides to restart all tasks?
> Thanks in advance!
> Stephan



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30120) Flink Statefun Task Failure causes restart of all tasks with regional failover strategy

2022-11-21 Thread Stephan Weinwurm (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Weinwurm updated FLINK-30120:
-
Summary: Flink Statefun Task Failure causes restart of all tasks with 
regional failover strategy  (was: Flink Statefun Task Failure causes restart of 
all tasks)

> Flink Statefun Task Failure causes restart of all tasks with regional 
> failover strategy
> ---
>
> Key: FLINK-30120
> URL: https://issues.apache.org/jira/browse/FLINK-30120
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
>Reporter: Stephan Weinwurm
>Priority: Major
>
> Hey all,
> We've noticed that a single task failure causes all of the Statefun tasks to 
> be restarted.
> For example, a single task fails because of some Statefun Endpoint 
> unavailability or if one of our Kuberentes TaskManager pods go down. 
> Flink then determines that the _region_ failover strategy requires all tasks 
> to be restarted so we see this in the logs:
>  
> {code:java}
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11.
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 5650 tasks should be restarted to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11. {code}
>  
> Our tasks are all fully independent so I would like that only the one failed 
> task to get restarted or moved to a different TaskManager slot.
> Is there a way to tell Flink to only restart the failed task? Or is there a 
> specific reason why the region failover strategy decides to restart all tasks?
> Thanks in advance!
> Stephan



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30120) Flink Statefun Task Failure causes restart of all tasks

2022-11-21 Thread Stephan Weinwurm (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Weinwurm updated FLINK-30120:
-
Description: 
Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the _region_ failover strategy requires all tasks to 
be restarted so we see this in the logs:

 

Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
{code:java}
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. {code}
 

 

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks?

Thanks in advance!
Stephan

  was:
Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the `region` failover strategy requires all tasks to 
be restarted so we see this in the logs:

```
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. 
```

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks? 

Thanks in advance!
Stephan


> Flink Statefun Task Failure causes restart of all tasks
> ---
>
> Key: FLINK-30120
> URL: https://issues.apache.org/jira/browse/FLINK-30120
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
>Reporter: Stephan Weinwurm
>Priority: Major
>
> Hey all,
> We've noticed that a single task failure causes all of the Statefun tasks to 
> be restarted.
> For example, a single task fails because of some Statefun Endpoint 
> unavailability or if one of our Kuberentes TaskManager pods go down. 
> Flink then determines that the _region_ failover strategy requires all tasks 
> to be restarted so we see this in the logs:
>  
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11.
> {code:java}
> Nov 17 10:20:30 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 5650 tasks should be restarted to recover the failed task 
> 31284d56d1e2112b0f20099ee448a6a9_11. {code}
>  
>  
> Our tasks are all fully independent so I would like that only the one failed 
> task to get restarted or moved to a different TaskManager slot.
> Is there a way to tell Flink to only restart the failed task? Or is there a 
> specific reason why the region failover strategy decides to restart all tasks?
> Thanks in advance!
> Stephan



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30120) Flink Statefun Task Failure causes restart of all tasks

2022-11-21 Thread Stephan Weinwurm (Jira)
Stephan Weinwurm created FLINK-30120:


 Summary: Flink Statefun Task Failure causes restart of all tasks
 Key: FLINK-30120
 URL: https://issues.apache.org/jira/browse/FLINK-30120
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Affects Versions: statefun-3.1.1, statefun-3.2.0, statefun-3.0.0
Reporter: Stephan Weinwurm


Hey all,

We've noticed that a single task failure causes all of the Statefun tasks to be 
restarted.

For example, a single task fails because of some Statefun Endpoint 
unavailability or if one of our Kuberentes TaskManager pods go down. 
Flink then determines that the `region` failover strategy requires all tasks to 
be restarted so we see this in the logs:

```
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11.
Nov 17 10:20:30 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 5650 tasks should be restarted to recover the failed task 
31284d56d1e2112b0f20099ee448a6a9_11. 
```

Our tasks are all fully independent so I would like that only the one failed 
task to get restarted or moved to a different TaskManager slot.

Is there a way to tell Flink to only restart the failed task? Or is there a 
specific reason why the region failover strategy decides to restart all tasks? 

Thanks in advance!
Stephan



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30073) Managed memory can be wasted if RocksDB memory is fixed-per-slot

2022-11-21 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636894#comment-17636894
 ] 

Roman Khachatryan commented on FLINK-30073:
---

{quote}to keep a job with fixed-per-slot running with the same resources{quote}
Aren't those resources already reserved? I.e. don't actual memory limits should 
already include X+Y? (where part of Y is wasted).

In overcommitted environments, where the limits are not strictly enforced, the 
behavior migth change. But that's I think is acceptable, as those requirements 
don't guarantee how many instances will run.

> Managed memory can be wasted if RocksDB memory is fixed-per-slot
> 
>
> Key: FLINK-30073
> URL: https://issues.apache.org/jira/browse/FLINK-30073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Roman Khachatryan
>Priority: Major
>
> When 
> [state.backend.rocksdb.memory.fixed-per-slot|https://github.com/apache/flink/blob/ba4b182955867fedfa9891bf0bf430e92eeab41a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java#L75]
>  is set, RocksDB does not use managed memory (this option overrides the 
> 'state.backend.rocksdb.memory.managed').
> However, the runtime [doesn't take this into 
> account|https://github.com/apache/flink/blob/ba4b182955867fedfa9891bf0bf430e92eeab41a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java#L75]
>  and still reserves the managed memory according to the configured weigths.
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028528113


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028512299


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[jira] [Comment Edited] (FLINK-21301) Decouple window aggregate allow lateness with state ttl configuration

2022-11-21 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636864#comment-17636864
 ] 

Eric Xiao edited comment on FLINK-21301 at 11/21/22 8:16 PM:
-

Hi there, I had a read of 
[https://www.mail-archive.com/user@flink.apache.org/msg43316.html] and was 
wondering if there was any particular reason why the allow-lateness 
configuration is not enabled for Window TVF aggregations? As I saw that Group 
Window Aggregation is deprecated [1].

Would it be something hard to implement? We might have bandwidth on our team to 
work on it.

 

[1] - 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation


was (Author: JIRAUSER295489):
Hi there, I had a read of 
[https://www.mail-archive.com/user@flink.apache.org/msg43316.html] and was 
wondering if there was any particular reason why the allow-lateness 
configuration is not enabled for Window TVF aggregations?

> Decouple window aggregate allow lateness with state ttl configuration
> -
>
> Key: FLINK-21301
> URL: https://issues.apache.org/jira/browse/FLINK-21301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.14.0
>
>
> Currently, state retention time config will also effect state clean behavior 
> of Window Aggregate, which is unexpected for most users.
> E.g for the following example,  User would set `MinIdleStateRetentionTime` to 
> 1 Day to clean state in `deduplicate` . However, it will also effects clean 
> behavior of window aggregate. For example, 2021-01-04 data would clean at 
> 2021-01-06 instead of 2021-01-05. 
> {code:sql}
> SELECT
>  DATE_FORMAT(tumble_end(ROWTIME ,interval '1' DAY),'-MM-dd') as stat_time,
>  count(1) first_phone_num
> FROM (
>  SELECT 
>  ROWTIME,
>  user_id,
>  row_number() over(partition by user_id, pdate order by ROWTIME ) as rn
>  FROM source_kafka_biz_shuidi_sdb_crm_call_record 
> ) cal 
> where rn =1
> group by tumble(ROWTIME,interval '1' DAY);{code}
> It's better to decouple window aggregate allow lateness with 
> `MinIdleStateRetentionTime` .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-21301) Decouple window aggregate allow lateness with state ttl configuration

2022-11-21 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636864#comment-17636864
 ] 

Eric Xiao commented on FLINK-21301:
---

Hi there, I had a read of 
[https://www.mail-archive.com/user@flink.apache.org/msg43316.html] and was 
wondering if there was any particular reason why the allow-lateness 
configuration is not enabled for Window TVF aggregations?

> Decouple window aggregate allow lateness with state ttl configuration
> -
>
> Key: FLINK-21301
> URL: https://issues.apache.org/jira/browse/FLINK-21301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.14.0
>
>
> Currently, state retention time config will also effect state clean behavior 
> of Window Aggregate, which is unexpected for most users.
> E.g for the following example,  User would set `MinIdleStateRetentionTime` to 
> 1 Day to clean state in `deduplicate` . However, it will also effects clean 
> behavior of window aggregate. For example, 2021-01-04 data would clean at 
> 2021-01-06 instead of 2021-01-05. 
> {code:sql}
> SELECT
>  DATE_FORMAT(tumble_end(ROWTIME ,interval '1' DAY),'-MM-dd') as stat_time,
>  count(1) first_phone_num
> FROM (
>  SELECT 
>  ROWTIME,
>  user_id,
>  row_number() over(partition by user_id, pdate order by ROWTIME ) as rn
>  FROM source_kafka_biz_shuidi_sdb_crm_call_record 
> ) cal 
> where rn =1
> group by tumble(ROWTIME,interval '1' DAY);{code}
> It's better to decouple window aggregate allow lateness with 
> `MinIdleStateRetentionTime` .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29073) [FLIP-91] Support SQL Gateway(Part 2)

2022-11-21 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636842#comment-17636842
 ] 

Eric Xiao commented on FLINK-29073:
---

Hi there, are any of the tasks above good for someone new to the community to 
start working on?

> [FLIP-91] Support SQL Gateway(Part 2)
> -
>
> Key: FLINK-29073
> URL: https://issues.apache.org/jira/browse/FLINK-29073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client, Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Shengkai Fang
>Priority: Major
>
> Issue continues improving the SQL Gateway and allows the SQL Client submit 
> jobs to the SQL Gateway.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #15: [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink

2022-11-21 Thread GitBox


hlteoh37 commented on code in PR #15:
URL: 
https://github.com/apache/flink-connector-aws/pull/15#discussion_r1028423893


##
flink-sql-connector-dynamodb/pom.xml:
##
@@ -77,12 +77,16 @@ under the License.

org.apache.httpcomponents:*

io.netty:*

commons-logging:commons-logging
+   
commons-codec:commons-codec





software.amazon

org.apache.flink.connector.dynamodb.shaded.software.amazon
+   

+   
software.amazon.awssdk.enhanced.**
+   


Review Comment:
   Ok! done the same!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028414670


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[jira] [Updated] (FLINK-28695) Fail to send partition request to restarted taskmanager

2022-11-21 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-28695:
---
Description: 
After upgrade to *1.15.1* we started getting error while running JOB

 
{code:java}
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed.at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
 {code}
{code:java}
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException 
atrg.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
 ChannelPromise)(Unknown Source){code}
After investigation we managed narrow it down to the exact behavior then this 
issue happens:
 # Deploying JOB on fresh kubernetes session cluster with multiple 
TaskManagers: TM1 and TM2 is successful. Job has multiple partitions running on 
both TM1 and TM2.
 # One TaskManager TM2 (XXX.XXX.XX.32) fails for unrelated issue. For example 
OOM exception.
 # Kubernetes POD with mentioned TaskManager TM2 is restarted. POD retains same 
IP address as before.
 # JobManager is able to pickup the restarted TM2 (XXX.XXX.XX.32)
 # JOB is restarted because it was running on the failed TaskManager TM2
 # TM1 data channel to TM2 is closed and we get LocalTransportException: 
Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed during JOB 
running stage.  
 # When we explicitly delete pod with TM2 it creates new POD with different IP 
address and JOB is able to start again.

Important to note that we didn't encountered this issue with previous *1.14.4* 
version and TaskManager restarts didn't cause such error.

Please note attached kubernetes deployments and reduced logs from JobManager. 
TaskManager logs did show errors before error, but doesn't show anything 
significant after restart.

EDIT:
{quote}
Setting {{taskmanager.network.max-num-tcp-connections}} to a very high number 
workarounds the problem
{quote}

  was:
After upgrade to *1.15.1* we started getting error while running JOB

 
{code:java}
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed.at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
 {code}
{code:java}
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException 
atrg.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
 ChannelPromise)(Unknown Source){code}
After investigation we managed narrow it down to the exact behavior then this 
issue happens:
 # Deploying JOB on fresh kubernetes session cluster with multiple 
TaskManagers: TM1 and TM2 is successful. Job has multiple partitions running on 
both TM1 and TM2.
 # One TaskManager TM2 (XXX.XXX.XX.32) fails for unrelated issue. For example 
OOM exception.
 # Kubernetes POD with mentioned TaskManager TM2 is restarted. POD retains same 
IP address as before.
 # JobManager is able to pickup the restarted TM2 (XXX.XXX.XX.32)
 # JOB is restarted because it was running on the failed TaskManager TM2
 # TM1 data channel to TM2 is closed and we get LocalTransportException: 
Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed during JOB 
running stage.  
 # When we explicitly delete pod with TM2 it creates new POD with different IP 
address and JOB is able to start again.

Important to note that we didn't encountered this issue with previous *1.14.4* 
version and TaskManager restarts didn't cause such error.

Please note attached kubernetes deployments and reduced logs from JobManager. 
TaskManager logs did show errors before error, but doesn't show anything 
significant after restart.


> Fail to send partition request to restarted taskmanager
> ---
>
> Key: FLINK-28695
> URL: https://issues.apache.org/jira/browse/FLINK-28695
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Runtime / Network
>Affects Versions: 1.15.0, 1.15.1
>Reporter: Simonas
>Assignee: Rui Fan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
> Attachments: deployment.txt, image-2022-11-20-16-16-45-705.png, 
> image-2022-11-21-17-15-58-749.png, job_log.txt, jobmanager_config.txt, 
> jobmanager_logs.txt, pod_restart.txt, taskmanager_config.txt
>
>
> After upgrade to *1.15.1* we started getting error while running JOB
>  
> {code:java}
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> Sending the partition request 

[jira] [Closed] (FLINK-28695) Fail to send partition request to restarted taskmanager

2022-11-21 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-28695.
--
Fix Version/s: 1.17.0
   1.16.1
   1.15.4
   Resolution: Fixed

merged commit 6b56505 into apache:release-1.15 
merged commit ecede7a into apache:release-1.16 
merged e7854193816^^^..e7854193816 into apache:master

Thanks [~fanrui] for fixing, and others for reporting/analyzing and confirming 
the bug and workaround.

> Fail to send partition request to restarted taskmanager
> ---
>
> Key: FLINK-28695
> URL: https://issues.apache.org/jira/browse/FLINK-28695
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Runtime / Network
>Affects Versions: 1.15.0, 1.15.1
>Reporter: Simonas
>Assignee: Rui Fan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
> Attachments: deployment.txt, image-2022-11-20-16-16-45-705.png, 
> image-2022-11-21-17-15-58-749.png, job_log.txt, jobmanager_config.txt, 
> jobmanager_logs.txt, pod_restart.txt, taskmanager_config.txt
>
>
> After upgrade to *1.15.1* we started getting error while running JOB
>  
> {code:java}
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed.at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
>  {code}
> {code:java}
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
>  
> atrg.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
>  ChannelPromise)(Unknown Source){code}
> After investigation we managed narrow it down to the exact behavior then this 
> issue happens:
>  # Deploying JOB on fresh kubernetes session cluster with multiple 
> TaskManagers: TM1 and TM2 is successful. Job has multiple partitions running 
> on both TM1 and TM2.
>  # One TaskManager TM2 (XXX.XXX.XX.32) fails for unrelated issue. For example 
> OOM exception.
>  # Kubernetes POD with mentioned TaskManager TM2 is restarted. POD retains 
> same IP address as before.
>  # JobManager is able to pickup the restarted TM2 (XXX.XXX.XX.32)
>  # JOB is restarted because it was running on the failed TaskManager TM2
>  # TM1 data channel to TM2 is closed and we get LocalTransportException: 
> Sending the partition request to '/XXX.XXX.XX.32:6121 (#0)' failed during JOB 
> running stage.  
>  # When we explicitly delete pod with TM2 it creates new POD with different 
> IP address and JOB is able to start again.
> Important to note that we didn't encountered this issue with previous 
> *1.14.4* version and TaskManager restarts didn't cause such error.
> Please note attached kubernetes deployments and reduced logs from JobManager. 
> TaskManager logs did show errors before error, but doesn't show anything 
> significant after restart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pnowojski merged pull request #21360: [BP-1.16][FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

2022-11-21 Thread GitBox


pnowojski merged PR #21360:
URL: https://github.com/apache/flink/pull/21360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski merged pull request #21359: [BP-1.15][FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

2022-11-21 Thread GitBox


pnowojski merged PR #21359:
URL: https://github.com/apache/flink/pull/21359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028411566


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/result/TableResultWrapper.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote.result;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.remote.RemoteExecutor;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** To wrap the result returned by {@link RemoteExecutor#executeStatement}. */
+public class TableResultWrapper implements TableResultInternal {
+
+private final ResolvedSchema resolvedSchema;
+private final CloseableIterator dataIterator;
+
+private String resultId;
+private boolean isMaterialized = false;
+private ReadableConfig config;
+
+public TableResultWrapper(
+RemoteExecutor executor,
+OperationHandle operationHandle,
+ResultSet firstResult,
+Long nextToken) {
+this.resolvedSchema = firstResult.getResultSchema();
+dataIterator =
+new RowDataIterator(executor, operationHandle, 
firstResult.getData(), nextToken);
+}
+
+public void setResultId(String resultId) {
+this.resultId = resultId;
+}
+
+public String getResultId() {
+return resultId;
+}
+
+public void setMaterialized(boolean isMaterialized) {
+this.isMaterialized = isMaterialized;
+}
+
+public boolean isMaterialized() {
+return isMaterialized;
+}
+
+public void setConfig(ReadableConfig config) {
+this.config = config;
+}
+
+public ReadableConfig getConfig() {
+return config;
+}
+
+/** Cannot get job client through SQL Gateway. */
+@Override
+public Optional getJobClient() {
+return Optional.empty();
+}
+
+@Override
+public void await() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void await(long timeout, TimeUnit unit) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public ResolvedSchema getResolvedSchema() {
+return resolvedSchema;
+}
+
+@Override
+public ResultKind getResultKind() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public CloseableIterator collect() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void print() {
+throw new UnsupportedOperationException();
+}
+
+/** Returns an iterator that returns the iterator with the internal row 
data type. */
+@Override
+public CloseableIterator collectInternal() {
+return dataIterator;
+}
+
+@Override
+public RowDataToStringConverter getRowDataToStringConverter() {
+// todo
+return rowData -> new String[] {"FAKE TEST RETURN"};
+}
+
+// 

+
+private static class RowDataIterator implements CloseableIterator 
{
+
+private final RemoteExecutor executor;
+private final OperationHandle operationHandle;
+private Iterator currentData;
+
+private Long nextToken;
+
+public RowDataIterator(
+RemoteExecutor executor,
+OperationHandle operationHandle,
+  

[jira] [Commented] (FLINK-20092) [Java 11] Multi-thread Flink compilation not working

2022-11-21 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636830#comment-17636830
 ] 

Jim Hughes commented on FLINK-20092:


Hi all, I chatted with [~chesnay] about this some.  He identified that 
https://issues.apache.org/jira/browse/MSHADE-413 may be the issue and suggested 
that I could try version 3.4.1 (the latest and the fix version for that JIRA) 
of the Maven Shade plugin.

Updating the plugin is being discussed on this PR: 
[https://github.com/apache/flink/pull/21344/files] 

> [Java 11] Multi-thread Flink compilation not working
> 
>
> Key: FLINK-20092
> URL: https://issues.apache.org/jira/browse/FLINK-20092
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Maciej Bryński
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> I'd like to use maven -T option when compiling flink.
> {code:java}
>  mvn -T 2C clean install -D"scala-2.12" -DskipTests{code}
> Unfortunately my build is stuck on:
> {code:java}
> [INFO] --- maven-shade-plugin:3.2.1:shade (shade-flink) @ 
> flink-fs-hadoop-shaded ---
> [INFO] Including org.apache.hadoop:hadoop-common:jar:3.1.0 in the shaded jar.
> [INFO] Including org.apache.hadoop:hadoop-annotations:jar:3.1.0 in the shaded 
> jar.
> [INFO] Including com.google.guava:guava:jar:11.0.2 in the shaded jar.
> [INFO] Including commons-io:commons-io:jar:2.7 in the shaded jar.
> [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
> shaded jar.
> [INFO] Including commons-logging:commons-logging:jar:1.1.3 in the shaded jar.
> [INFO] Including commons-lang:commons-lang:jar:2.6 in the shaded jar.
> [INFO] Including commons-beanutils:commons-beanutils:jar:1.9.3 in the shaded 
> jar.
> [INFO] Including org.apache.commons:commons-configuration2:jar:2.1.1 in the 
> shaded jar.
> [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
> [INFO] Including com.google.re2j:re2j:jar:1.1 in the shaded jar.
> [INFO] Including org.apache.hadoop:hadoop-auth:jar:3.1.0 in the shaded jar.
> [INFO] Including org.apache.htrace:htrace-core4:jar:4.1.0-incubating in the 
> shaded jar.
> [INFO] Including com.fasterxml.jackson.core:jackson-databind:jar:2.10.1 in 
> the shaded jar.
> [INFO] Including com.fasterxml.jackson.core:jackson-annotations:jar:2.10.1 in 
> the shaded jar.
> [INFO] Including com.fasterxml.jackson.core:jackson-core:jar:2.10.1 in the 
> shaded jar.
> [INFO] Including org.codehaus.woodstox:stax2-api:jar:3.1.4 in the shaded jar.
> [INFO] Including com.fasterxml.woodstox:woodstox-core:jar:5.0.3 in the shaded 
> jar.
> [INFO] Including org.apache.flink:force-shading:jar:1.12-SNAPSHOT in the 
> shaded jar.
> [INFO] No artifact matching filter io.netty:netty
> [WARNING] Discovered module-info.class. Shading will break its strong 
> encapsulation.
> [WARNING] Discovered module-info.class. Shading will break its strong 
> encapsulation.
> [WARNING] Discovered module-info.class. Shading will break its strong 
> encapsulation.
> [INFO] Replacing original artifact with shaded artifact.
> [INFO] Replacing 
> /home/maverick/flink/flink-filesystems/flink-fs-hadoop-shaded/target/flink-fs-hadoop-shaded-1.12-SNAPSHOT.jar
>  with 
> /home/maverick/flink/flink-filesystems/flink-fs-hadoop-shaded/target/flink-fs-hadoop-shaded-1.12-SNAPSHOT-shaded.jar
> [INFO] Dependency-reduced POM written at: 
> /home/maverick/flink/flink-filesystems/flink-fs-hadoop-shaded/target/dependency-reduced-pom.xml
> {code}
> Can we make flink compilation working with multiple maven threads ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jnh5y commented on a diff in pull request #21344: [FLINK-30083] Bump maven-shade-plugin version

2022-11-21 Thread GitBox


jnh5y commented on code in PR #21344:
URL: https://github.com/apache/flink/pull/21344#discussion_r1028404398


##
pom.xml:
##
@@ -991,7 +991,7 @@ under the License.


org.apache.maven.plugins

maven-shade-plugin
-   3.2.4
+   3.4.0

Review Comment:
   @rmetzger I'm the random person that @zentol mentioned!  
   
   I was about to open a PR to bump this version to 3.4.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-21 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1028370047


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoClient;
+import org.apache.commons.collections.CollectionUtils;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ERROR_MESSAGE_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.SPLIT_KEYS_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.UNAUTHORIZED_ERROR;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.isCommandSucceed;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.splitVector;
+
+/**
+ * SplitVector Partitioner
+ *
+ * Uses the SplitVector command to generate chunks for a collection. eg. 

+ * db.runCommand({splitVector:"inventory.products", keyPattern:{_id:1}, 
maxChunkSize:64})
+ *
+ * Requires splitVector privilege.
+ */
+@Internal
+public class MongoSplitVectorSplitter implements MongoSplitters.MongoSplitter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoSplitVectorSplitter.class);
+
+public static final MongoSplitVectorSplitter INSTANCE = new 
MongoSplitVectorSplitter();
+
+private MongoSplitVectorSplitter() {}
+
+@Override
+public Collection split(MongoSplitContext 
splitContext) {
+MongoClient mongoClient = splitContext.getMongoClient();
+MongoNamespace namespace = splitContext.getMongoNamespace();
+MongoReadOptions readOptions = splitContext.getReadOptions();
+
+MemorySize chunkSize = readOptions.getPartitionSize();
+// if partition size < 1mb, use 1 mb as chunk size.
+int maxChunkSizeMB = Math.max(chunkSize.getMebiBytes(), 1);
+
+BsonDocument keyPattern = new BsonDocument(ID_FIELD, new BsonInt32(1));
+
+BsonDocument splitResult;
+try {
+splitResult = splitVector(mongoClient, namespace, keyPattern, 
maxChunkSizeMB);

Review Comment:
   Thanks @zentol, I checked driver source code of 
`InternalStreamConnection#receiveCommandMessageResponse`. If the execution of 
the command fails, an exception will be thrown, and we don't need to deal with 
other case.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25859) Add documentation for DynamoDB Async Sink

2022-11-21 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636791#comment-17636791
 ] 

Danny Cranmer commented on FLINK-25859:
---

[~Gusev] I will pick this up, let me know if you were already working on it and 
I can reassign to you.

> Add documentation for DynamoDB Async Sink
> -
>
> Key: FLINK-25859
> URL: https://issues.apache.org/jira/browse/FLINK-25859
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / DynamoDB, Documentation
>Reporter: Yuri Gusev
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: aws-connector-2.0.0
>
>
> h2. Motivation
> FLINK-24229 _introduces a new sink for DynamoDB_
> *Scope:*
>  * Create documentation for the new connector
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-25859) Add documentation for DynamoDB Async Sink

2022-11-21 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-25859:
-

Assignee: Danny Cranmer  (was: Yuri Gusev)

> Add documentation for DynamoDB Async Sink
> -
>
> Key: FLINK-25859
> URL: https://issues.apache.org/jira/browse/FLINK-25859
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / DynamoDB, Documentation
>Reporter: Yuri Gusev
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: aws-connector-2.0.0
>
>
> h2. Motivation
> FLINK-24229 _introduces a new sink for DynamoDB_
> *Scope:*
>  * Create documentation for the new connector
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30119) (Flink Kubernetes Operator should store last savepoint in the SavepointInfo.lastSavepoint field whether it is completed or pending

2022-11-21 Thread Clara Xiong (Jira)
Clara Xiong created FLINK-30119:
---

 Summary: (Flink Kubernetes Operator should store last savepoint in 
the SavepointInfo.lastSavepoint field whether it is completed or pending
 Key: FLINK-30119
 URL: https://issues.apache.org/jira/browse/FLINK-30119
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Clara Xiong


End user experience proposal:

Users can see the properties of last savepoint pending or completed and can get 
status in one of three states for the status: PENDING, SUCCEEDED and FAILED. If 
there is never savepoint taken or attempted, it is empty. Completed savepoints 
(manual, periodic and upgrade) are included Savepoint history, merged with 
savepoints form Flink job.

Users can see this savepoint with PENDING status once one is trigger. Once 
completed, users can see last savepoint status changed to SUCCEEDED and 
included in savepoint history, or FAILED and not in savepoint history. If there 
is other savepoint triggered after completion before user checks, user cannot 
see the status of the one they triggered but they can check if the savepoint is 
in the history.

Currently lastSavepoint only stores the last completed one, duplicate with 
savepoint history. To expose the properties of the currently pending savepoint 
or last savepoint that failed, we need to expose those info in separate fields 
in SavepointInfo. The internal logic of Operator uses those fields for 
triggering and retries and creates compatibility issues with client. It also 
use more space for etcd size limit.

Code change proposal:

Use lastSavepoint to store the last completed/attempted one and deprecate 
SavepointInfo.triggerTimstamp, SavepointInfo.triggerType and 
SavepointInfo.formatType. This will simplify the CRD and logic.

Add SavepointInfo::retrieveLastSavepoint method to return the last succeeded 
one.

Update getLastSavepointStatus to simplify the logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21361: [BP-1.16][FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.

2022-11-21 Thread GitBox


flinkbot commented on PR #21361:
URL: https://github.com/apache/flink/pull/21361#issuecomment-1322406572

   
   ## CI report:
   
   * 4c931775b41721914b49b120df699b8b685b2908 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #21361: [BP-1.16][FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.

2022-11-21 Thread GitBox


reswqa opened a new pull request, #21361:
URL: https://github.com/apache/flink/pull/21361

   
   ## What is the purpose of the change
   
   *Back port FLINK-29639 to release 1.16*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30096) Rename DynamoDB config destinationTableName to tableName

2022-11-21 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636780#comment-17636780
 ] 

Danny Cranmer commented on FLINK-30096:
---

Merged commit 
[{{7dfebdd}}|https://github.com/apache/flink-connector-aws/commit/7dfebdd3bdcc5b2012ea912e633c2eb3f80fec33]
 into apache:main

> Rename DynamoDB config destinationTableName to tableName
> 
>
> Key: FLINK-30096
> URL: https://issues.apache.org/jira/browse/FLINK-30096
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-2.0.0
>
>
> The word destination is redundant since it is part of a DDB table sink.
>  
> Renaming destinationTableName to tableName in all places



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30096) Rename DynamoDB config destinationTableName to tableName

2022-11-21 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-30096.
---
Resolution: Fixed

> Rename DynamoDB config destinationTableName to tableName
> 
>
> Key: FLINK-30096
> URL: https://issues.apache.org/jira/browse/FLINK-30096
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-2.0.0
>
>
> The word destination is redundant since it is part of a DDB table sink.
>  
> Renaming destinationTableName to tableName in all places



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] dannycranmer merged pull request #25: [FLINK-30096][Connectors/DynamoDB] Rename destination table name to table name

2022-11-21 Thread GitBox


dannycranmer merged PR #25:
URL: https://github.com/apache/flink-connector-aws/pull/25


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30096) Rename DynamoDB config destinationTableName to tableName

2022-11-21 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-30096:
--
Fix Version/s: aws-connector-2.0.0

> Rename DynamoDB config destinationTableName to tableName
> 
>
> Key: FLINK-30096
> URL: https://issues.apache.org/jira/browse/FLINK-30096
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-2.0.0
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-2.0.0
>
>
> The word destination is redundant since it is part of a DDB table sink.
>  
> Renaming destinationTableName to tableName in all places



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30096) Rename DynamoDB config destinationTableName to tableName

2022-11-21 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-30096:
--
Affects Version/s: (was: aws-connector-2.0.0)

> Rename DynamoDB config destinationTableName to tableName
> 
>
> Key: FLINK-30096
> URL: https://issues.apache.org/jira/browse/FLINK-30096
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-2.0.0
>
>
> The word destination is redundant since it is part of a DDB table sink.
>  
> Renaming destinationTableName to tableName in all places



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30096) Rename DynamoDB config destinationTableName to tableName

2022-11-21 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-30096:
-

Assignee: Hong Liang Teoh

> Rename DynamoDB config destinationTableName to tableName
> 
>
> Key: FLINK-30096
> URL: https://issues.apache.org/jira/browse/FLINK-30096
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-2.0.0
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
>
> The word destination is redundant since it is part of a DDB table sink.
>  
> Renaming destinationTableName to tableName in all places



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #15: [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink

2022-11-21 Thread GitBox


dannycranmer commented on code in PR #15:
URL: 
https://github.com/apache/flink-connector-aws/pull/15#discussion_r1028269038


##
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/example_dynamodb.sql:
##
@@ -0,0 +1,50 @@
+

Review Comment:
   Missing copyright header



##
flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java:
##
@@ -0,0 +1,267 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableOptionsBuilder;
+import org.apache.flink.table.factories.TestFormatFactory;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE;
+import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT;
+import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BATCH_SIZE;
+import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS;
+import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
+import static 
org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Test for {@link DynamoDbDynamicSink} created by {@link 
DynamoDbDynamicSinkFactory}. */
+public class DynamoDbDynamicSinkFactoryTest {
+
+private static final String DYNAMO_DB_TABLE_NAME = "TestDynamoDBTable";
+
+@Test
+void testGoodPartitionedTableSink() {
+ResolvedSchema sinkSchema =
+ResolvedSchema.of(
+Column.physical("partition_key", DataTypes.STRING()),
+Column.physical("sort_key", DataTypes.BIGINT()),
+Column.physical("payload", DataTypes.STRING()),
+Column.physical("some_char_array", 
DataTypes.ARRAY(DataTypes.CHAR(1))),
+Column.physical(
+"some_varchar_array", 
DataTypes.ARRAY(DataTypes.VARCHAR(1))),
+Column.physical("some_string_array", 
DataTypes.ARRAY(DataTypes.STRING())),
+Column.physical("some_boolean_array", 
DataTypes.ARRAY(DataTypes.BOOLEAN())),
+Column.physical(
+"some_decimal_array", 
DataTypes.ARRAY(DataTypes.DECIMAL(1, 1))),
+Column.physical("some_tinyint_array", 
DataTypes.ARRAY(DataTypes.TINYINT())),
+Column.physical(
+"some_smallint_array", 
DataTypes.ARRAY(DataTypes.SMALLINT())),
+Column.physical("some_int_array", 
DataTypes.ARRAY(DataTypes.INT())),
+Column.physical("some_bigint_array", 
DataTypes.ARRAY(DataTypes.BIGINT())),
+Column.physical("some_float_array", 
DataTypes.ARRAY(DataTypes.FLOAT())),
+Column.physical("some_date_array", 
DataTypes.ARRAY(DataTypes.DATE())),
+Column.physical("some_time_array", 
DataTypes.ARRAY(DataTypes.TIME())),
+Column.physical(
+"some_timestamp_array", 
DataTypes.ARRAY(DataTypes.TIMESTAMP())),
+Column.physical(
+"some_timestamp_ltz_array",
+DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ())),
+Column.physical(
+"some_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.BIGINT(;
+Map sinkOptions = defaultSinkOptions().build();
+List partitionKeys = 
Collections.singletonList("partition_key");
+
+// Construct actual sink
+DynamoDbDynamicSink actualSink =
+ 

[jira] [Updated] (FLINK-30096) Rename DynamoDB config destinationTableName to tableName

2022-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30096:
---
Labels: pull-request-available  (was: )

> Rename DynamoDB config destinationTableName to tableName
> 
>
> Key: FLINK-30096
> URL: https://issues.apache.org/jira/browse/FLINK-30096
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-2.0.0
>Reporter: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
>
> The word destination is redundant since it is part of a DDB table sink.
>  
> Renaming destinationTableName to tableName in all places



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >