[jira] [Commented] (FLINK-29405) InputFormatCacheLoaderTest is unstable
[ https://issues.apache.org/jira/browse/FLINK-29405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681211#comment-17681211 ] Matthias Pohl commented on FLINK-29405: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45230=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=10958 > InputFormatCacheLoaderTest is unstable > -- > > Key: FLINK-29405 > URL: https://issues.apache.org/jira/browse/FLINK-29405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0, 1.17.0 >Reporter: Chesnay Schepler >Assignee: Alexander Smirnov >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > > #testExceptionDuringReload/#testCloseAndInterruptDuringReload fail reliably > when run in a loop. > {code} > java.lang.AssertionError: > Expecting AtomicInteger(0) to have value: > 0 > but did not. > at > org.apache.flink.table.runtime.functions.table.fullcache.inputformat.InputFormatCacheLoaderTest.testCloseAndInterruptDuringReload(InputFormatCacheLoaderTest.java:161) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30727) JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException
[ https://issues.apache.org/jira/browse/FLINK-30727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681210#comment-17681210 ] Matthias Pohl commented on FLINK-30727: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45229=logs=f2c100be-250b-5e85-7bbe-176f68fcddc5=05efd11e-5400-54a4-0d27-a4663be008a9=12514 > JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException > > > Key: FLINK-30727 > URL: https://issues.apache.org/jira/browse/FLINK-30727 > Project: Flink > Issue Type: Bug > Components: Runtime / Network, Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Yunhong Zheng >Priority: Blocker > Labels: pull-request-available, test-stability > > IOException due to timeout occurring while requesting exclusive NetworkBuffer > caused JoinReorderITCase.testBushyTreeJoinReorder to fail: > {code} > [...] > Jan 18 01:11:27 Caused by: java.io.IOException: Timeout triggered when > requesting exclusive buffers: The total number of network buffers is > currently set to 2048 of 32768 bytes each. You can increase this number by > setting the configuration keys 'taskmanager.memory.network.fraction', > 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or > you may increase the timeout which is 3ms by setting the key > 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. > Jan 18 01:11:27 at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:256) > Jan 18 01:11:27 at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:179) > Jan 18 01:11:27 at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:262) > Jan 18 01:11:27 at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:517) > Jan 18 01:11:27 at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:277) > Jan 18 01:11:27 at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105) > Jan 18 01:11:27 at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:962) > Jan 18 01:11:27 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:648) > Jan 18 01:11:27 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:556) > Jan 18 01:11:27 at java.lang.Thread.run(Thread.java:748) > {code} > Same build, 2 failures: > * > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=14300 > * > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=14362 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29427) LookupJoinITCase failed with classloader problem
[ https://issues.apache.org/jira/browse/FLINK-29427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681209#comment-17681209 ] Matthias Pohl commented on FLINK-29427: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45220=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=16759 > LookupJoinITCase failed with classloader problem > > > Key: FLINK-29427 > URL: https://issues.apache.org/jira/browse/FLINK-29427 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Alexander Smirnov >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > > {code:java} > 2022-09-27T02:49:20.9501313Z Sep 27 02:49:20 Caused by: > org.codehaus.janino.InternalCompilerException: Compiling > "KeyProjection$108341": Trying to access closed classloader. Please check if > you store classloaders directly or indirectly in static fields. If the > stacktrace suggests that the leak occurs in a third party library and cannot > be fixed immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > 2022-09-27T02:49:20.9502654Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382) > 2022-09-27T02:49:20.9503366Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > 2022-09-27T02:49:20.9504044Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > 2022-09-27T02:49:20.9504704Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > 2022-09-27T02:49:20.9505341Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > 2022-09-27T02:49:20.9505965Z Sep 27 02:49:20 at > org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > 2022-09-27T02:49:20.9506584Z Sep 27 02:49:20 at > org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > 2022-09-27T02:49:20.9507261Z Sep 27 02:49:20 at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104) > 2022-09-27T02:49:20.9507883Z Sep 27 02:49:20 ... 30 more > 2022-09-27T02:49:20.9509266Z Sep 27 02:49:20 Caused by: > java.lang.IllegalStateException: Trying to access closed classloader. Please > check if you store classloaders directly or indirectly in static fields. If > the stacktrace suggests that the leak occurs in a third party library and > cannot be fixed immediately, you can disable this check with the > configuration 'classloader.check-leaked-classloader'. > 2022-09-27T02:49:20.9510835Z Sep 27 02:49:20 at > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184) > 2022-09-27T02:49:20.9511760Z Sep 27 02:49:20 at > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192) > 2022-09-27T02:49:20.9512456Z Sep 27 02:49:20 at > java.lang.Class.forName0(Native Method) > 2022-09-27T02:49:20.9513014Z Sep 27 02:49:20 at > java.lang.Class.forName(Class.java:348) > 2022-09-27T02:49:20.9513649Z Sep 27 02:49:20 at > org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:89) > 2022-09-27T02:49:20.9514339Z Sep 27 02:49:20 at > org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:312) > 2022-09-27T02:49:20.9514990Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:8556) > 2022-09-27T02:49:20.9515659Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6749) > 2022-09-27T02:49:20.9516337Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594) > 2022-09-27T02:49:20.9516989Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573) > 2022-09-27T02:49:20.9517632Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215) > 2022-09-27T02:49:20.9518319Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481) > 2022-09-27T02:49:20.9519018Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476) > 2022-09-27T02:49:20.9519680Z Sep 27 02:49:20 at > org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928) > 2022-09-27T02:49:20.9520386Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476) > 2022-09-27T02:49:20.9521042Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469) >
[jira] [Commented] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
[ https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681196#comment-17681196 ] Rui Fan commented on FLINK-30623: - FLINK-26806 was merged at 23.01.2023, I'm not sure whether it will affect the benchmark. I can check it today. > Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023 > --- > > Key: FLINK-30623 > URL: https://issues.apache.org/jira/browse/FLINK-30623 > Project: Flink > Issue Type: Bug > Components: Benchmarks, Runtime / Checkpointing >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available > Fix For: 1.17.0 > > > Performance regression > checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005 > checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277 > deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395 > stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919 > tupleKeyBy median=4155.684199 recent_median=3987.5812305 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fredia commented on pull request #21747: [FLINK-24932][state] Bump frocksdbjni to 6.20.3-ververica-2.0
fredia commented on PR #21747: URL: https://github.com/apache/flink/pull/21747#issuecomment-1406049554 @MartijnVisser Thanks for the review and verification. I compared the benchmark results based on the latest master(f6c7c30118ef26f98a7d422831fa8047f1fd9f98), below are the results. As we can see, the performance behaves the same as @Myasuka's [result](https://issues.apache.org/jira/browse/FLINK-24932?focusedCommentId=17569889=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17569889), the performance of version 2.0 is almost the same as that of version 1.0. In addition to supporting M1, .6.20.3-ververica-2.0 also updates the [dependencies](https://github.com/ververica/frocksdb/pull/56) of zlib, adds `periodic_compaction_seconds` [option](https://github.com/ververica/frocksdb/pull/57) to RocksJava. http://www.w3.org/TR/REC-html40;> Benchmark | 6.20.3-ververica-1.0 | 6.20.3-ververica-2.0 | ratio -- | -- | -- | -- org.apache.flink.state.benchmark.ListStateBenchmark.listAdd | 389.803 | 386.465338 | -0.008562433 org.apache.flink.state.benchmark.ListStateBenchmark.listAddAll | 311.726 | 314.086747 | 0.007573148 org.apache.flink.state.benchmark.ListStateBenchmark.listAppend | 387.375 | 385.4015 | -0.005094547 org.apache.flink.state.benchmark.ListStateBenchmark.listGet | 152.5 | 154.95988 | 0.016130361 org.apache.flink.state.benchmark.ListStateBenchmark.listGetAndIterate | 155.356 | 155.927967 | 0.003681654 org.apache.flink.state.benchmark.ListStateBenchmark.listUpdate | 391.685 | 388.147242 | -0.009032151 org.apache.flink.state.benchmark.MapStateBenchmark.mapAdd | 320.924 | 314.778435 | -0.019149596 org.apache.flink.state.benchmark.MapStateBenchmark.mapContains | 66.928 | 67.744243 | 0.012195837 org.apache.flink.state.benchmark.MapStateBenchmark.mapEntries | 439.316 | 438.016991 | -0.00295689 org.apache.flink.state.benchmark.MapStateBenchmark.mapGet | 66.511 | 66.365644 | -0.002185443 org.apache.flink.state.benchmark.MapStateBenchmark.mapIsEmpty | 59.914 | 60.307518 | 0.006568048 org.apache.flink.state.benchmark.MapStateBenchmark.mapIterator | 441.623 | 439.512541 | -0.00477887 org.apache.flink.state.benchmark.MapStateBenchmark.mapKeys | 439.221 | 440.065508 | 0.00192274 org.apache.flink.state.benchmark.MapStateBenchmark.mapPutAll | 85.741 | 85.65495 | -0.001003604 org.apache.flink.state.benchmark.MapStateBenchmark.mapRemove | 335.227 | 358.395208 | 0.069111999 org.apache.flink.state.benchmark.MapStateBenchmark.mapUpdate | 310.086 | 339.343382 | 0.094352476 org.apache.flink.state.benchmark.MapStateBenchmark.mapValues | 443.319 | 446.092917 | 0.006257158 org.apache.flink.state.benchmark.ValueStateBenchmark.valueAdd | 322.669 | 306.841597 | -0.049051514 org.apache.flink.state.benchmark.ValueStateBenchmark.valueGet | 437.556 | 443.970351 | 0.014659497 org.apache.flink.state.benchmark.ValueStateBenchmark.valueUpdate | 331.987 | 336.718447 | 0.014251904 org.apache.flink.state.benchmark.RocksdbStateBackendRescalingBenchmarkExecutor.rescaleRocksDB | 18845.28 | 18719.78056 | -0.006659463 org.apache.flink.state.benchmark.RocksdbStateBackendRescalingBenchmarkExecutor.rescaleRocksDB | 1746.154 | 1848.152231 | 0.058413079 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] kmozaid commented on pull request #504: [FLINK-30669] update recent job status in flinkdeployment object
kmozaid commented on PR #504: URL: https://github.com/apache/flink-kubernetes-operator/pull/504#issuecomment-1406021738 > I agree @kmozaid if this doesn't affect existing jobs and improves some others then the change makes sense :) > > Could you please add a unit test to guard this change against possible future regressions? sure, let me add a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30799) Make SinkFunction support speculative execution for batch jobs
Biao Liu created FLINK-30799: Summary: Make SinkFunction support speculative execution for batch jobs Key: FLINK-30799 URL: https://issues.apache.org/jira/browse/FLINK-30799 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 In this ticket, it would make SinkFunction based sink run with speculative execution for batch jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30798) Make OutputFormat support speculative execution for batch jobs
Biao Liu created FLINK-30798: Summary: Make OutputFormat support speculative execution for batch jobs Key: FLINK-30798 URL: https://issues.apache.org/jira/browse/FLINK-30798 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 This issue would make OutputFormat based sink run with speculative execution for batch jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21765: [Flink-30755][runtime] Make SinkV2 support speculative execution for batch jobs
flinkbot commented on PR #21765: URL: https://github.com/apache/flink/pull/21765#issuecomment-1405950702 ## CI report: * 74cdbff981b35993ef2546d508f0ad6c2e4c2ba0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ifndef-SleePy opened a new pull request, #21765: [Flink-30755][runtime] Make SinkV2 support speculative execution for batch jobs
ifndef-SleePy opened a new pull request, #21765: URL: https://github.com/apache/flink/pull/21765 ## What is the purpose of the change *This pull request introduces a way to allow SinkV2 run with speculative execution* ## Brief change log - *Introduce SupportsConcurrentExecutionAttempts interface* - *Add supportsConcurrentExecutionAttempts property for Transformation, StreamGraph, JobGraph* - *Make SinkV2 support speculative execution through implementing SupportsConcurrentExecutionAttempts interface* - *Use supportsConcurrentExecutionAttempts for excluding sink of speculative execution* ## Verifying this change This change added tests: - *Add new test case in StreamingJobGraphGeneratorTest* - *Add new test case in SpeculativeSchedulerITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
1996fanrui commented on PR #21697: URL: https://github.com/apache/flink/pull/21697#issuecomment-1405922443 Thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30709) NetworkInput#emitNext() should push records to DataOutput in a while loop
[ https://issues.apache.org/jira/browse/FLINK-30709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin closed FLINK-30709. Resolution: Fixed > NetworkInput#emitNext() should push records to DataOutput in a while loop > - > > Key: FLINK-30709 > URL: https://issues.apache.org/jira/browse/FLINK-30709 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA > focus on Network input. > > Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push > at most one record to the given DataOutput. This unnecessarily increases the > average Java call stack depth needed to produce a record. > Take the following program as an example. For each element produced by this > program, Flink runtime needs to include in the call stack these 3 function > calls: > * StreamTask#processInput() > * StreamOneInputProcessor#processInput() > * AbstractStreamTaskNetworkInput#emitNext() > This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to > push records to DataOutput in a while loop. It improves Flink performance by > removing an average of 3 function from the call stack needed to produce a > record. > Here are the benchmark results obtained by running the > [InputBenchmark#mapSink|https://github.com/apache/flink-benchmarks/blob/0bafe0e85700c889894324aadb70302381f98e03/src/main/java/org/apache/flink/benchmark/InputBenchmark.java#L55] > with env.disableOperatorChaining(). And I run it 4 times on My Mac. > > {code:java} > Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed > results: > Benchmark (sourceType) Mode Cnt Score Error Units > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12339.771 ± 414.649 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12687.872 ± 320.084 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12256.445 ± 512.219 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12432.154 ± 405.083 ops/ms > After the proposed change, the avg is 13836.845 ops/ms, here is detailed > results: > Benchmark (sourceType) Mode Cnt Score Error Units > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13092.451 ± 490.886 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13881.138 ± 370.249 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13960.280 ± 389.505 ops/ms > InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 14413.511 ± 727.844 > ops/ms{code} > > The proposed change increases throughput by 11.3%. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lindong28 closed pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
lindong28 closed pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop URL: https://github.com/apache/flink/pull/21697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kristoffSC commented on pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods
kristoffSC commented on PR #21393: URL: https://github.com/apache/flink/pull/21393#issuecomment-1405755647 @tsreaper I've refactored both `BlockStatementSplitter` and `BlockStatementGrouper` so they use one `for` loop and one `visitor` instance like you suggested in your comments. There is no merge/add blocks now. I would appreciate if you take a look and let me know what do you think. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kristoffSC commented on a diff in pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods
kristoffSC commented on code in PR #21393: URL: https://github.com/apache/flink/pull/21393#discussion_r1088399790 ## flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java: ## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codesplit; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext; +import org.apache.flink.table.codesplit.JavaParser.StatementContext; + +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.TokenStreamRewriter; +import org.antlr.v4.runtime.atn.PredictionMode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Extract statements from IFs, ELSEs and WILEs blocks making them smaller. + * + * BlockStatementSplitter does not recognize if statement operates on local of class member + * variable. Because of that, code must be preprocessed by {@link DeclarationRewriter} which + * converts all local variables extracted as to member variables. + * + * Before + * + * + * while (counter > 0) { + * int localA = a + 1000; + * System.out.println(localA); + * if (a > 0) { + * b = a * 2; + * c = b * 2; + * System.out.println(b); + * } else { + * b = a * 3; + * System.out.println(b); + * } + * counter--; + * } + * + * + * + * After + * + * + * while (counter > 0) { + * myFun_whileBody0_0(int a); + * if (a > 0) { + * myFun_whileBody0_0_ifBody0(int a); + * } else { + * myFun_whileBody0_0_ifBody1(int a); + * } + * counter--; + * } + * + * + * Where bodies of extracted "methods" are: + * + * + * myFun_whileBody0_0(int a) -> + * int localA = a + 1000; + * System.out.println(localA); + * + * + * + * myFun_whileBody0_0_ifBody0(int a) -> + * b = a * 2; + * c = b * 2; + * System.out.println(b); + * + * + * + * myFun_whileBody0_0_ifBody1(int a) -> + * b = a * 3; + * System.out.println(b); + * + */ +@Internal +public class BlockStatementSplitter { + +private final String code; + +private final String parameters; + +private BlockStatementVisitor visitor; + +/** + * Initialize new BlockStatementSplitter. + * + * @param code a code block that should be rewritten. + * @param parameters parameters definition that should be used for extracted methods. + */ +public BlockStatementSplitter(String code, String parameters) { +this.code = code; +this.parameters = parameters; +} + +/** + * This method extracts statements from IFs, ELSE's and WHILE blocks from block code used during + * initialization of this object. Every entry of returned map can be seen as new method nam (map + * key) and method's body. The block names will be prefixed with provided context. + * + * @param context prefix for extracted blocks. + * @return a map of block name to block statements mappings. The key can be interpreted as name + * of extracted block/method and corresponding List represents individual statements (block' + * lines) for this block. + */ +public Map> extractBlocks(String context) { + +this.visitor = new BlockStatementVisitor(code, context, parameters); +JavaParser javaParser = new JavaParser(visitor.tokenStream); +javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL); +visitor.visitStatement(javaParser.statement()); +return visitor.getAllBlocks(); +} + +/** + * Rewrite code block that was used for this object initialization. + * + * @return a map which key represent rewritten block name and value represents rewritten code + * block, including calls to extracted methods + */ +public Map rewriteBlock() { +visitor.rewrite(); +Map rewriteBlocks = new HashMap<>(); +
[jira] [Commented] (FLINK-30444) State recovery error not handled correctly and always causes JM failure
[ https://issues.apache.org/jira/browse/FLINK-30444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681133#comment-17681133 ] Gyula Fora commented on FLINK-30444: Hi [~dmvk] ! Did you make any progress on this? Would be good to hear what you think. I am also happy to help out with any dev/testing efforts. > State recovery error not handled correctly and always causes JM failure > --- > > Key: FLINK-30444 > URL: https://issues.apache.org/jira/browse/FLINK-30444 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.16.0, 1.14.6, 1.15.3 >Reporter: Gyula Fora >Assignee: David Morávek >Priority: Critical > > When you submit a job in Application mode and you try to restore from an > incompatible savepoint, there is a very unexpected behaviour. > Even with the following config: > {noformat} > execution.shutdown-on-application-finish: false > execution.submit-failed-job-on-application-error: true{noformat} > The job goes into a FAILED state, and the jobmanager fails. In a kubernetes > environment (when using the native kubernetes integration) this means that > the JobManager is restarted automatically. > This will mean that if you have jobresult store enabled, after the JM comes > back you will end up with an empty application cluster. > I think the correct behaviour would be, depending on the above mention config: > 1. If there is a job recovery error and you have > (execution.submit-failed-job-on-application-error) configured, then the job > should show up as failed, and the JM should not exit (if > execution.shutdown-on-application-finish is false) > 2. If (execution.shutdown-on-application-finish is true) then the jobmanager > should exit cleanly like on normal job terminal state and thus stop the > deployment in Kubernetes, preventing a JM restart cycle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #514: [WIP][FLINK-27273] Zookeeper HA support
gyfora commented on PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#issuecomment-1405650899 > Thanks a lot @gyfora! > > Regarding: > > > We just have to ensure that we handle the Zookeeper dependency as optional > > The dependency was actually already there! I didn't have to add anything. > > I looked into it and all Curator classes are a transitive dependency of the `flink-clients` package which is listed as a dependency in `pom.xml` [here](https://github.com/antonipp/flink-kubernetes-operator/blob/57b2832d97a4d52d7fcd814d50b94f4e745ce027/flink-kubernetes-operator/pom.xml#L75-L79). > > $ mvn dependency:tree -pl 'flink-kubernetes-operator' > [...] > [INFO] +- org.apache.flink:flink-clients:jar:1.15.3:compile > [INFO] | +- org.apache.flink:flink-runtime:jar:1.15.3:compile > [INFO] | | +- org.apache.flink:flink-rpc-core:jar:1.15.3:compile > [INFO] | | +- [...] > [INFO] | | +- org.apache.flink:flink-shaded-zookeeper-3:jar:3.5.9-15.0:compile Hm, thats convenient :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support
gyfora commented on code in PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088339662 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() { return kubernetesClient; } +public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration conf) { +return ZooKeeperUtils.startCuratorFramework(conf, exception -> {}); +} + @Override public void submitApplicationCluster( JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { LOG.info( "Deploying application cluster{}", requireHaMetadata ? " requiring last-state from HA metadata" : ""); + +// If Kubernetes or Zookeeper HA are activated, delete the job graph in HA storage so that +// the newly changed job config (e.g. parallelism) could take effect if (FlinkUtils.isKubernetesHAActivated(conf)) { final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); -// Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g. -// parallelism) could take effect FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient); +} else if (FlinkUtils.isZookeeperHAActivated(conf)) { +try (var curator = getCurator(conf)) { +ZooKeeperUtils.deleteZNode( Review Comment: The problem with HA storage access is both dependencies and credentials. Of which credentials is more problematic, it would open a big attack surface if the operator held storage creds of users somehow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support
gyfora commented on code in PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088338668 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() { return kubernetesClient; } +public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration conf) { +return ZooKeeperUtils.startCuratorFramework(conf, exception -> {}); Review Comment: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support
gyfora commented on code in PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088338472 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() { return kubernetesClient; } +public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration conf) { +return ZooKeeperUtils.startCuratorFramework(conf, exception -> {}); +} + @Override public void submitApplicationCluster( JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { LOG.info( "Deploying application cluster{}", requireHaMetadata ? " requiring last-state from HA metadata" : ""); + +// If Kubernetes or Zookeeper HA are activated, delete the job graph in HA storage so that +// the newly changed job config (e.g. parallelism) could take effect if (FlinkUtils.isKubernetesHAActivated(conf)) { final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); -// Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g. -// parallelism) could take effect FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient); +} else if (FlinkUtils.isZookeeperHAActivated(conf)) { +try (var curator = getCurator(conf)) { Review Comment: I think it's fine to create the client on demand -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support
gyfora commented on code in PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088337843 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -340,6 +341,11 @@ protected Configuration build() { // Set cluster config effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace); effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); + +if (FlinkUtils.isZookeeperHAActivated(effectiveConfig)) { +effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); Review Comment: In that case I would suggest using the namespace + name combination itself to get a unique resource identifier. That is basically the same logic for Kubernetes HA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser commented on pull request #20049: [FLINK-28046][connectors] Mark SourceFunction interface as @Deprecated
MartijnVisser commented on PR #20049: URL: https://github.com/apache/flink/pull/20049#issuecomment-1405608732 @afedulov Do you think you can still make it before the feature freeze? (That's the 31st of January). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2
[ https://issues.apache.org/jira/browse/FLINK-30797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-30797. -- Resolution: Fixed Fixed in master: eb8a33fb0cb7a07443df835a3142a1a139206ed6 > Bump json5 from 1.0.1 to 1.0.2 > -- > > Key: FLINK-30797 > URL: https://issues.apache.org/jira/browse/FLINK-30797 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Dependabot has created https://github.com/apache/flink/pull/21617 > This is the corresponding Jira ticket -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2
[ https://issues.apache.org/jira/browse/FLINK-30797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30797: --- Labels: pull-request-available (was: ) > Bump json5 from 1.0.1 to 1.0.2 > -- > > Key: FLINK-30797 > URL: https://issues.apache.org/jira/browse/FLINK-30797 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Dependabot has created https://github.com/apache/flink/pull/21617 > This is the corresponding Jira ticket -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser merged pull request #21617: [FLINK-30797] Bump json5 from 1.0.1 to 1.0.2 in /flink-runtime-web/web-dashboard
MartijnVisser merged PR #21617: URL: https://github.com/apache/flink/pull/21617 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2
[ https://issues.apache.org/jira/browse/FLINK-30797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30797: --- Summary: Bump json5 from 1.0.1 to 1.0.2 (was: Bump json5 from 1.0.1 to 1.0.2 in) > Bump json5 from 1.0.1 to 1.0.2 > -- > > Key: FLINK-30797 > URL: https://issues.apache.org/jira/browse/FLINK-30797 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Fix For: 1.17.0 > > > Dependabot has created https://github.com/apache/flink/pull/21617 > This is the corresponding Jira ticket -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2 in
Martijn Visser created FLINK-30797: -- Summary: Bump json5 from 1.0.1 to 1.0.2 in Key: FLINK-30797 URL: https://issues.apache.org/jira/browse/FLINK-30797 Project: Flink Issue Type: Technical Debt Components: Runtime / Web Frontend Reporter: Martijn Visser Assignee: Martijn Visser Fix For: 1.17.0 Dependabot has created https://github.com/apache/flink/pull/21617 This is the corresponding Jira ticket -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser commented on pull request #21671: [BK-1.16][FLINK-30657] Remove Shared and Key_Shared related tests in Pulsar connector
MartijnVisser commented on PR #21671: URL: https://github.com/apache/flink/pull/21671#issuecomment-1405601473 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kristoffSC commented on pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods
kristoffSC commented on PR #21393: URL: https://github.com/apache/flink/pull/21393#issuecomment-1405572025 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
[ https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681102#comment-17681102 ] Piotr Nowojski edited comment on FLINK-30623 at 1/26/23 8:00 PM: - It seems like not everything has been fixed? [http://codespeed.dak8s.net:8000/timeline/?ben=checkpointSingleInput.UNALIGNED=2] Does anyone has some idea why? was (Author: pnowojski): It seems like everything has been fixed? [http://codespeed.dak8s.net:8000/timeline/?ben=checkpointSingleInput.UNALIGNED=2] Does anyone has some idea why? > Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023 > --- > > Key: FLINK-30623 > URL: https://issues.apache.org/jira/browse/FLINK-30623 > Project: Flink > Issue Type: Bug > Components: Benchmarks, Runtime / Checkpointing >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available > Fix For: 1.17.0 > > > Performance regression > checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005 > checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277 > deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395 > stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919 > tupleKeyBy median=4155.684199 recent_median=3987.5812305 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
[ https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reopened FLINK-30623: Assignee: (was: Rui Fan) > Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023 > --- > > Key: FLINK-30623 > URL: https://issues.apache.org/jira/browse/FLINK-30623 > Project: Flink > Issue Type: Bug > Components: Benchmarks, Runtime / Checkpointing >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available > Fix For: 1.17.0 > > > Performance regression > checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005 > checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277 > deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395 > stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919 > tupleKeyBy median=4155.684199 recent_median=3987.5812305 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
[ https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681102#comment-17681102 ] Piotr Nowojski commented on FLINK-30623: It seems like everything has been fixed? [http://codespeed.dak8s.net:8000/timeline/?ben=checkpointSingleInput.UNALIGNED=2] Does anyone has some idea why? > Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023 > --- > > Key: FLINK-30623 > URL: https://issues.apache.org/jira/browse/FLINK-30623 > Project: Flink > Issue Type: Bug > Components: Benchmarks, Runtime / Checkpointing >Reporter: Martijn Visser >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.17.0 > > > Performance regression > checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005 > checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277 > deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395 > stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919 > tupleKeyBy median=4155.684199 recent_median=3987.5812305 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200 > http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21764: [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.
flinkbot commented on PR #21764: URL: https://github.com/apache/flink/pull/21764#issuecomment-1405322143 ## CI report: * 09865792df8a4901f8429b063e09f5ee1b4a75e9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26082) CancelPartitionRequestTest.testDuplicateCancel failed on azure due to bind failed
[ https://issues.apache.org/jira/browse/FLINK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26082: --- Labels: auto-deprioritized-major pull-request-available test-stability (was: auto-deprioritized-major test-stability) > CancelPartitionRequestTest.testDuplicateCancel failed on azure due to bind > failed > - > > Key: FLINK-26082 > URL: https://issues.apache.org/jira/browse/FLINK-26082 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.5, 1.16.0 >Reporter: Yun Gao >Assignee: Anton Kalashnikov >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available, > test-stability > > {code:java} > Feb 10 01:56:01 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 2.273 s <<< FAILURE! - in > org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest > Feb 10 01:56:01 [ERROR] > testDuplicateCancel(org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest) > Time elapsed: 1.877 s <<< ERROR! > Feb 10 01:56:01 > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > bind(..) failed: Address already in use > Feb 10 01:56:01 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31070=logs=f0ac5c25-1168-55a5-07ff-0e88223afed9=0dbaca5d-7c38-52e6-f4fe-2fb69ccb3ada=6768 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] akalash opened a new pull request, #21764: [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.
akalash opened a new pull request, #21764: URL: https://github.com/apache/flink/pull/21764 ## What is the purpose of the change This fix initializes test netty server in the loop in order to decrease probability of `Address already in use` problem. ## Brief change log - *Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.* ## Verifying this change It is a test itself ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] mas-chen commented on pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo
mas-chen commented on PR #1: URL: https://github.com/apache/flink-connector-kafka/pull/1#issuecomment-1405303999 Thanks for your help @zentol @MartijnVisser! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21763: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.
flinkbot commented on PR #21763: URL: https://github.com/apache/flink/pull/21763#issuecomment-1405300184 ## CI report: * b29a8ee33deeca92104a8375be8b98a4bce7b524 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28596) Support writing arrays to postgres array columns in Flink SQL JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-28596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681036#comment-17681036 ] Martijn Visser commented on FLINK-28596: [~bobbyrlg] If you have a PR, looking forward to it :) > Support writing arrays to postgres array columns in Flink SQL JDBC connector > > > Key: FLINK-28596 > URL: https://issues.apache.org/jira/browse/FLINK-28596 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.15.0 >Reporter: Bobby Richard >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] akalash opened a new pull request, #21763: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.
akalash opened a new pull request, #21763: URL: https://github.com/apache/flink/pull/21763 ## What is the purpose of the change This change fixes the bug in a test which trigger the latch for the current test from the remaining thread of the previous test. ## Brief change log - *The static latches in TaskTest were replaced by latches from invokable objects in order to avoid accidental sharing latches between multiple tests* ## Verifying this change It is a test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
[ https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Kalashnikov reassigned FLINK-15550: - Assignee: Anton Kalashnikov > testCancelTaskExceptionAfterTaskMarkedFailed failed on azure > > > Key: FLINK-15550 > URL: https://issues.apache.org/jira/browse/FLINK-15550 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3, 1.16.0 >Reporter: Yun Tang >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available, stale-assigned > > Instance: > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug > {code:java} > java.lang.AssertionError: expected: but was: > at > org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525) > {code} > {code:java} > expected: but was: > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] kristoffSC commented on pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods
kristoffSC commented on PR #21393: URL: https://github.com/apache/flink/pull/21393#issuecomment-1405227888 Hi @tsreaper I'm currently working on the refactoring to include your suggestions, it does look promising. I should have new version by the end of this week. I have a question though. Can we assume that variables/fields used in IF and WHILE conditions will be always class member variables? I'm looking at `DeclarationRewriter` and I wonder if we take such assumption or can IF/WHILE condition arguments can come from method parameters? If they can come from method parameters and if they are primitive values, then we should not rewrite such a method. I don't think that original IfStatementRewriter deal with that but maybe I don't see it. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30619) AdaptiveSchedulerTest.testStatusMetrics is not stable
[ https://issues.apache.org/jira/browse/FLINK-30619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-30619. Fix Version/s: 1.17.0 1.16.2 Resolution: Fixed master: 38d390114e93e9f202dc5ac7fa668dd255637139 1.16: 0f52f62b1891230308282e0b819062eb13eb5644 > AdaptiveSchedulerTest.testStatusMetrics is not stable > - > > Key: FLINK-30619 > URL: https://issues.apache.org/jira/browse/FLINK-30619 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0, 1.16.2 > > > We experience a test instability in > {{AdaptiveSchedulerTest.testStatusMetrics}}. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44635=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8475 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30619) AdaptiveSchedulerTest.testStatusMetrics is not stable
[ https://issues.apache.org/jira/browse/FLINK-30619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-30619: - Issue Type: Technical Debt (was: Bug) > AdaptiveSchedulerTest.testStatusMetrics is not stable > - > > Key: FLINK-30619 > URL: https://issues.apache.org/jira/browse/FLINK-30619 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0, 1.16.2 > > > We experience a test instability in > {{AdaptiveSchedulerTest.testStatusMetrics}}. > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44635=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8475 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol merged pull request #21751: [FLINK-30619][tests] Add retry
zentol merged PR #21751: URL: https://github.com/apache/flink/pull/21751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30052) Move existing Kafka connector code from Flink repo to dedicated Kafka repo
[ https://issues.apache.org/jira/browse/FLINK-30052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681024#comment-17681024 ] Chesnay Schepler commented on FLINK-30052: -- kafka-main: 979bdcfafabae392fa6978fb107c4228d68776b7 2fb6ad544c374c76c0e49c1ef11affbd82fe63d0 > Move existing Kafka connector code from Flink repo to dedicated Kafka repo > -- > > Key: FLINK-30052 > URL: https://issues.apache.org/jira/browse/FLINK-30052 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Mason Chen >Priority: Major > Labels: pull-request-available > > Instructions guide can be found at > https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] zentol merged pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo
zentol merged PR #1: URL: https://github.com/apache/flink-connector-kafka/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] boring-cyborg[bot] commented on pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo
boring-cyborg[bot] commented on PR #1: URL: https://github.com/apache/flink-connector-kafka/pull/1#issuecomment-1405200120 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #513: [FLINK-30781] Add completed checkpoint check to cluster health check
gaborgsomogyi commented on code in PR #513: URL: https://github.com/apache/flink-kubernetes-operator/pull/513#discussion_r1087995977 ## docs/layouts/shortcodes/generated/dynamic_section.html: ## @@ -8,6 +8,18 @@ + + kubernetes.operator.cluster.health-check.completed-checkpoints.enabled +false +Boolean +Whether to enable completed checkpoint health check for clusters. + + + kubernetes.operator.cluster.health-check.completed-checkpoints.window Review Comment: Changed + fixed other issues in the doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] antonipp commented on pull request #514: [WIP][FLINK-27273] Zookeeper HA support
antonipp commented on PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#issuecomment-1405163365 Thanks a lot @gyfora! Regarding: > We just have to ensure that we handle the Zookeeper dependency as optional The dependency was actually already there! I didn't have to add anything. I looked into it and all Curator classes are coming from the `flink-clients` package which is listed as a dependency in `pom.xml` [here](https://github.com/antonipp/flink-kubernetes-operator/blob/57b2832d97a4d52d7fcd814d50b94f4e745ce027/flink-kubernetes-operator/pom.xml#L75-L79). $ mvn dependency:tree -pl 'flink-kubernetes-operator' [...] [INFO] +- org.apache.flink:flink-clients:jar:1.15.3:compile [INFO] | +- org.apache.flink:flink-runtime:jar:1.15.3:compile [INFO] | | +- org.apache.flink:flink-rpc-core:jar:1.15.3:compile [INFO] | | +- [...] [INFO] | | +- org.apache.flink:flink-shaded-zookeeper-3:jar:3.5.9-15.0:compile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087957846 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator +implements SplitEnumerator { +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); +private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + +private final SplitEnumeratorContext enumeratorContext; +private final CassandraEnumeratorState state; +private final Cluster cluster; + +public CassandraSplitEnumerator( +SplitEnumeratorContext enumeratorContext, +CassandraEnumeratorState state, +ClusterBuilder clusterBuilder) { +this.enumeratorContext = enumeratorContext; +this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; +this.cluster = clusterBuilder.getCluster(); +} + +@Override +public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { +assignUnprocessedSplitsToReader(subtaskId); +} + +@Override +public void start() { +// discover the splits and update unprocessed splits and then assign them. +// There is only an initial splits discovery, no periodic discovery. +enumeratorContext.callAsync( +this::discoverSplits, +(splits, throwable) -> { +LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); +state.addNewSplits(splits, enumeratorContext.currentParallelism()); +}); +} + +private List discoverSplits() { +final int numberOfSplits = enumeratorContext.currentParallelism(); +final Metadata clusterMetadata = cluster.getMetadata(); +final String partitioner = clusterMetadata.getPartitioner(); +final SplitsGenerator splitsGenerator = new SplitsGenerator(partitioner); +if (MURMUR3PARTITIONER.equals(partitioner)) { +LOG.info("Murmur3Partitioner detected, splitting"); +List tokens = +clusterMetadata.getTokenRanges().stream() +.map( +tokenRange -> +new BigInteger( + tokenRange.getEnd().getValue().toString())) +.collect(Collectors.toList()); +return splitsGenerator.generateSplits(numberOfSplits, tokens); +} else { +// Murmur3Partitioner is the default and recommended partitioner for Cassandra 1.2+ +// see +// https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html +LOG.warn( +"The current Cassandra partitioner is {}, only
[jira] [Commented] (FLINK-27273) Support both configMap and zookeeper based HA data clean up
[ https://issues.apache.org/jira/browse/FLINK-27273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681010#comment-17681010 ] Anton Ippolitov commented on FLINK-27273: - Hi, FYI I just opened a draft PR for this issue ([https://github.com/apache/flink-kubernetes-operator/pull/514)|https://github.com/apache/flink-kubernetes-operator/pull/514] and left some comments about the parts of code I wasn't 100% sure about. Thanks a lot [~gyfora] for a super quick review as well > Support both configMap and zookeeper based HA data clean up > --- > > Key: FLINK-27273 > URL: https://issues.apache.org/jira/browse/FLINK-27273 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Aitozi >Assignee: Anton Ippolitov >Priority: Major > Labels: pull-request-available > > As discussed in > [comments|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815695041] > We only support clean up the ha data based configMap. Considering that > zookeeper is still widely used as ha service when deploy on the kubernetes, I > think we should still take it into account, otherwise, It will come up with > some unexpected behavior when play with zk ha jobs. For example, it will > recover old JobGraph when redeploy the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087950644 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { Review Comment: class removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] antonipp commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support
antonipp commented on code in PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1087919678 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() { return kubernetesClient; } +public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration conf) { Review Comment: Wrapped ZK client creation in a separate method so that it's easier to mock once I get to unit tests ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -340,6 +341,11 @@ protected Configuration build() { // Set cluster config effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace); effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); + +if (FlinkUtils.isZookeeperHAActivated(effectiveConfig)) { +effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); Review Comment: By default, `clusterId` is generated [based on the FlinkDeployment resource name](https://github.com/antonipp/flink-kubernetes-operator/blob/6b0d91fe2c1dc41eeba655337c1d99fedabe3ef8/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L96). The value of `HighAvailabilityOptions.HA_CLUSTER_ID` is later [used](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L226) as the namespace for storing Zookeeper ZNodes for each Flink application. This means that if someone creates two `FlinkDeployment`s with the same name in two different Kubernetes namespaces and these two `FlinkDeployment`s use the same ZK cluster for HA metadata storage, there might be a collision. So I think we should append a random UUID to `HighAvailabilityOptions.HA_CLUSTER_ID` : ```java effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId + "-RANDOM-UUID-HERE"); ``` Not 100% sure of all the implications of doing that but I think it should be safe enough? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() { return kubernetesClient; } +public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration conf) { +return ZooKeeperUtils.startCuratorFramework(conf, exception -> {}); Review Comment: Note that by leveraging the [ZooKeeperUtils::startCuratorFramework](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L200) function and by taking the application `conf`, we create the client in exactly the same way as the Flink application itself. This means that we should be able to support all ZK configuration settings that are supported by Flink applications themselves (ex: ACLs). ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() { return kubernetesClient; } +public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration conf) { +return ZooKeeperUtils.startCuratorFramework(conf, exception -> {}); +} + @Override public void submitApplicationCluster( JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { LOG.info( "Deploying application cluster{}", requireHaMetadata ? " requiring last-state from HA metadata" : ""); + +// If Kubernetes or Zookeeper HA are activated, delete the job graph in HA storage so that +// the newly changed job config (e.g. parallelism) could take effect if (FlinkUtils.isKubernetesHAActivated(conf)) { final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); -// Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g. -// parallelism) could take effect FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient); +} else if (FlinkUtils.isZookeeperHAActivated(conf)) { +try (var curator = getCurator(conf)) { Review Comment: Here, the ZK client is created only for one request and then torn down (in `FlinkUtils ::isZookeeperHaMetadataAvailable` it's the same thing). We could somehow cache these clients but I think it's not worth it. First of
[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087944134 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private final Cluster cluster; +private final Session session; +private final Set unprocessedSplits; +private final AtomicBoolean wakeup = new AtomicBoolean(false); +private final String query; + +public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { +// need a thread safe set +this.unprocessedSplits = ConcurrentHashMap.newKeySet(); +this.query = query; +cluster = clusterBuilder.getCluster(); +session = cluster.connect(); +} + +@Override +public RecordsWithSplitIds fetch() { +Map> recordsBySplit = new HashMap<>(); +Set finishedSplits = new HashSet<>(); +Metadata clusterMetadata = cluster.getMetadata(); + +String partitionKey = getPartitionKey(clusterMetadata); +String finalQuery = generateRangeQuery(query, partitionKey); +PreparedStatement preparedStatement = session.prepare(finalQuery); +// Set wakeup to false to start consuming. +wakeup.compareAndSet(true, false); +for (CassandraSplitState cassandraSplitState : unprocessedSplits) { +// allow to interrupt the reading of splits as requested in the API +if (wakeup.get()) { +break; +} +if (!cassandraSplitState.isEmpty()) { +try { +final Set ringRanges = +cassandraSplitState.getUnprocessedRingRanges(); +final String cassandraSplitId = cassandraSplitState.getSplitId(); + +for (RingRange ringRange : ringRanges) { +Token startToken = + clusterMetadata.newToken(ringRange.getStart().toString()); +
[GitHub] [flink-connector-cassandra] echauchot commented on pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1405116976 @zentol I'm almost done replacing the whole split mechanism. I could even remove the size estimates as Flink does not rely on expected split memory size as Beam. So I could use a very simple splitting mechanism based only on Cassandra min and max tokens per partitioner (min,x][x, max) for 2 splits for example. I also could consider that a Cassandra split processes only a single ring range and get rid of RingRange class. It'll change much. I also need to do a final pass before it is ready for another round. Otherwise you'll see sketchy things. don't waste your time reviewing now. I'll ping you to take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087939852 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator +implements SplitEnumerator { +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); +private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + +private final SplitEnumeratorContext enumeratorContext; +private final CassandraEnumeratorState state; +private final Cluster cluster; + +public CassandraSplitEnumerator( +SplitEnumeratorContext enumeratorContext, +CassandraEnumeratorState state, +ClusterBuilder clusterBuilder) { +this.enumeratorContext = enumeratorContext; +this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; +this.cluster = clusterBuilder.getCluster(); +} + +@Override +public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { +assignUnprocessedSplitsToReader(subtaskId); +} + +@Override +public void start() { +// discover the splits and update unprocessed splits and then assign them. +// There is only an initial splits discovery, no periodic discovery. +enumeratorContext.callAsync( +this::discoverSplits, +(splits, throwable) -> { +LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); +state.addNewSplits(splits, enumeratorContext.currentParallelism()); +}); +} + +private List discoverSplits() { +final int numberOfSplits = enumeratorContext.currentParallelism(); +final Metadata clusterMetadata = cluster.getMetadata(); +final String partitioner = clusterMetadata.getPartitioner(); +final SplitsGenerator splitsGenerator = new SplitsGenerator(partitioner); +if (MURMUR3PARTITIONER.equals(partitioner)) { +LOG.info("Murmur3Partitioner detected, splitting"); +List tokens = +clusterMetadata.getTokenRanges().stream() +.map( +tokenRange -> +new BigInteger( + tokenRange.getEnd().getValue().toString())) +.collect(Collectors.toList()); +return splitsGenerator.generateSplits(numberOfSplits, tokens); +} else { +// Murmur3Partitioner is the default and recommended partitioner for Cassandra 1.2+ +// see +// https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html +LOG.warn( +"The current Cassandra partitioner is {}, only
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private final Cluster cluster; +private final Session session; +private final Set unprocessedSplits; +private final AtomicBoolean wakeup = new AtomicBoolean(false); +private final String query; + +public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { +// need a thread safe set Review Comment: I've pushed an impl that assigns only a single split to each reader and this removed the need for concurrent set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087928810 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Represents a portion of Cassandra token ring. It is a range between a start token and an end + * token. + */ +public final class RingRange implements Serializable { Review Comment: No longer needs to be serializable; also applies to other classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #514: [WIP][FLINK-27273] Zookeeper HA support
gyfora commented on PR #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514#issuecomment-1405091162 I think from an initial review this looks pretty good. We just have to ensure that we handle the Zookeeper dependency as optional as this is the less frequent integration :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087926802 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator; +import org.apache.flink.connector.cassandra.source.reader.CassandraSourceReader; +import org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bounded source to read from Cassandra and return a collection of entities as {@code + * DataStream}. An entity is built by Cassandra mapper ({@code + * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing annotations (as described + * in https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/;> + * Cassandra object mapper). + * + * To use it, do the following: + * + * {@code + * ClusterBuilder clusterBuilder = new ClusterBuilder() { + * @Override + * protected Cluster buildCluster(Cluster.Builder builder) { + * return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT)) + * .withQueryOptions(new QueryOptions().setConsistencyLevel(CL)) + * .withSocketOptions(new SocketOptions() + * .setConnectTimeoutMillis(CONNECT_TIMEOUT) + * .setReadTimeoutMillis(READ_TIMEOUT)) + * .build(); + * } + * }; + * Source cassandraSource = new CassandraSource(clusterBuilder, + * Pojo.class, + * "select ... from KEYSPACE.TABLE ...;", + * () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + * + * DataStream stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), + * "CassandraSource"); + * } + */ +@PublicEvolving +public class CassandraSource +implements Source, ResultTypeQueryable { + +public static final String CQL_PROHIBITTED_CLAUSES_REGEXP = +"(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*"; +private static final long serialVersionUID = 7773196541275567433L; + +private final ClusterBuilder clusterBuilder; +private final Class pojoClass; +private final String query; +private final MapperOptions mapperOptions; + +public CassandraSource( +ClusterBuilder clusterBuilder, +Class pojoClass, +String query, +MapperOptions mapperOptions) { +checkNotNull(clusterBuilder, "ClusterBuilder required but
[jira] [Assigned] (FLINK-27273) Support both configMap and zookeeper based HA data clean up
[ https://issues.apache.org/jira/browse/FLINK-27273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-27273: -- Assignee: Anton Ippolitov > Support both configMap and zookeeper based HA data clean up > --- > > Key: FLINK-27273 > URL: https://issues.apache.org/jira/browse/FLINK-27273 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Aitozi >Assignee: Anton Ippolitov >Priority: Major > Labels: pull-request-available > > As discussed in > [comments|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815695041] > We only support clean up the ha data based configMap. Considering that > zookeeper is still widely used as ha service when deploy on the kubernetes, I > think we should still take it into account, otherwise, It will come up with > some unexpected behavior when play with zk ha jobs. For example, it will > recover old JobGraph when redeploy the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27273) Support both configMap and zookeeper based HA data clean up
[ https://issues.apache.org/jira/browse/FLINK-27273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27273: --- Labels: pull-request-available (was: ) > Support both configMap and zookeeper based HA data clean up > --- > > Key: FLINK-27273 > URL: https://issues.apache.org/jira/browse/FLINK-27273 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > As discussed in > [comments|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815695041] > We only support clean up the ha data based configMap. Considering that > zookeeper is still widely used as ha service when deploy on the kubernetes, I > think we should still take it into account, otherwise, It will come up with > some unexpected behavior when play with zk ha jobs. For example, it will > recover old JobGraph when redeploy the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] antonipp opened a new pull request, #514: [WIP][FLINK-27273] Zookeeper HA support
antonipp opened a new pull request, #514: URL: https://github.com/apache/flink-kubernetes-operator/pull/514 > ⚠️ This PR is WIP, my goal is to get feedback as early as possible ## What is the purpose of the change https://issues.apache.org/jira/browse/FLINK-27273 This PR makes sure that Flink applications which use Zookeeper-based HA can be deployed via the Kubernetes Operator. It wasn't previously possible mainly due to the lack of proper clean-up of Zookeeper data which could cause unexpected behaviour in Flink applications, as mentioned in the ticket. ## Brief change log - JobGraph-related data is now properly cleaned in Zookeeper (same behaviour as when using Kubernetes HA) - Code has been updated to support both Kubernetes and Zookeper-based HA settings. All features which worked with Kubernetes HA (such as JM deployment recovery, unhealthy job restarts or rollbacks) should also work with Zookeeper HA as well. ## Verifying this change - **TODO: add unit tests!** - Tried some scenarios in our Kubernetes environment: - `upgradeMode: savepoint`: - [x] Successfully deployed a Flink application with `upgradeMode: savepoint` and Zookeeper HA enabled - [x] Updated the `FlinkDeployment` object, verified that the JobGraph data was successfully deleted in Zookeeper and that the new version of the application was successfully rolled out - [x] Manually deleted the JobManager Kubernetes Deployment, verified that the Operator was able to recover it (with `kubernetes.operator.jm-deployment-recovery.enabled` and `kubernetes.operator.job.upgrade.last-state-fallback.enabled` set to `true`) and that the application restarted successfully (this process relies on HA metadata too) - `upgradeMode: last-state`: - [x] Successfully deployed a Flink application with `upgradeMode: last-state` and Zookeeper HA enabled - [x] Updated the `FlinkDeployment` object, verified that the JobGraph data was successfully deleted in Zookeeper and that the new version of the application was successfully rolled out - [x] Manually deleted the JobManager Kubernetes Deployment, verified that the Operator was able to recover it with `kubernetes.operator.jm-deployment-recovery.enabled` set to `true` and that the application restarted successfully (this process relies on HA metadata too) - [x] Set `kubernetes.operator.cluster.health-check.enabled` to `true` and deployed a Flink application which was continuously failing and restarting. Once the restart threshold (`kubernetes.operator.cluster.health-check.restarts.threshold`) was hit, the Operator was able to successfully validate that the Zookeeper HA metadata exists and restarted the job - [x] Set `kubernetes.operator.deployment.rollback.enabled` to `true` and deployed a Flink application which was in `CrashLoopBackOff`. Once the `kubernetes.operator.deployment.readiness.timeout` passed, the Operator was able to successfully validate that the Zookeeper HA metadata exists and rolled back the job to the previous version. - **TODO: anything else?** ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature?: yes - If yes, how is the feature documented?: Updated the documentation where applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21762: [FLINK-30796][metrics][slf4j] (Partially) skip metrics report
flinkbot commented on PR #21762: URL: https://github.com/apache/flink/pull/21762#issuecomment-1405063430 ## CI report: * 476e8e6495fe8930c7bde3ad1834d79260a23580 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30796) Make Slf4jReporter less noisy when no/few metrics exist
[ https://issues.apache.org/jira/browse/FLINK-30796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30796: --- Labels: pull-request-available (was: ) > Make Slf4jReporter less noisy when no/few metrics exist > --- > > Key: FLINK-30796 > URL: https://issues.apache.org/jira/browse/FLINK-30796 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol opened a new pull request, #21762: [FLINK-30796][metrics][slf4j] (Partially) skip metrics report
zentol opened a new pull request, #21762: URL: https://github.com/apache/flink/pull/21762 The e2e logs are more noisy then they need to be because the slf4j reporter logs a lot of lines even if no metrics (of a particular type) are registered. Full skip: Before: ``` === Starting metrics report === -- Counters --- -- Gauges - -- Meters - -- Histograms - === Finished metrics report === ``` After: ``` Skipping metrics report because no metrics are registered. ``` Partial skip: Before: ``` === Starting metrics report === -- Counters --- scope.simpleCounter: 0 -- Gauges - -- Meters - -- Histograms - === Finished metrics report === ``` After: ``` === Starting metrics report === -- Counters - scope.simpleCounter: 0 === Finished metrics report === ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TJX2014 commented on pull request #21761: [FLINK-30795][tests] StreamingWithStateTestBase of win not correct
TJX2014 commented on PR #21761: URL: https://github.com/apache/flink/pull/21761#issuecomment-1405040674 Hi @AlanConfluent , could you please help me review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21761: [FLINK-30795][tests] StreamingWithStateTestBase of win not correct
flinkbot commented on PR #21761: URL: https://github.com/apache/flink/pull/21761#issuecomment-1405027565 ## CI report: * 6f02da7bce96a3bbd67c4ee868273298fb72f73a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #513: [FLINK-30781] Add completed checkpoint check to cluster health check
gyfora commented on code in PR #513: URL: https://github.com/apache/flink-kubernetes-operator/pull/513#discussion_r1087871390 ## docs/layouts/shortcodes/generated/dynamic_section.html: ## @@ -8,6 +8,18 @@ + + kubernetes.operator.cluster.health-check.completed-checkpoints.enabled +false +Boolean +Whether to enable completed checkpoint health check for clusters. + + + kubernetes.operator.cluster.health-check.completed-checkpoints.window Review Comment: maybe it would be better to call it `checkpoint-progress`, and make it more explicit in the description that we record unhealthy state if no checkpoints were taken at all in the check window -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30795) StreamingWithStateTestBase of win not correct
[ https://issues.apache.org/jira/browse/FLINK-30795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30795: --- Labels: pull-request-available (was: ) > StreamingWithStateTestBase of win not correct > - > > Key: FLINK-30795 > URL: https://issues.apache.org/jira/browse/FLINK-30795 > Project: Flink > Issue Type: Bug > Components: Tests > Environment: Windows path such as > " > file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439 > " > will throw > > new IllegalArgumentException("Cannot use the root directory for > checkpoints."); >Reporter: JinxinTang >Priority: Major > Labels: pull-request-available > > Windows path such as > " > [file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439|file:///C:/UsersxxAppDataLocalTempjunit373749850266957074junit7014045318909690439] > " > will throw > > new IllegalArgumentException("Cannot use the root directory for > checkpoints."); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] TJX2014 opened a new pull request, #21761: [FLINK-30795][tests] StreamingWithStateTestBase of win not correct
TJX2014 opened a new pull request, #21761: URL: https://github.com/apache/flink/pull/21761 ## What is the purpose of the change Fix test case broken in win os. ## Brief change log Change "file://" to LocalFileSystem.getLocalFsURI.toString. ## Verifying this change org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase case work correctly in win os. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30796) Make Slf4jReporter less noisy when no/few metrics exist
Chesnay Schepler created FLINK-30796: Summary: Make Slf4jReporter less noisy when no/few metrics exist Key: FLINK-30796 URL: https://issues.apache.org/jira/browse/FLINK-30796 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #509: [FLINK-30653] Trigger resource Events on autoscaler actions
gyfora commented on code in PR #509: URL: https://github.com/apache/flink-kubernetes-operator/pull/509#discussion_r1087868130 ## examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java: ## @@ -18,23 +18,38 @@ package autoscaling; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; /** Autoscaling Example. */ public class AutoscalingExample { public static void main(String[] args) throws Exception { var env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream stream = env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE); +long numIterations = Long.parseLong(args[0]); +DataStream stream = +env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).filter(i -> System.nanoTime() > 1); stream = -stream.shuffle() +stream.keyBy(s -> "s") Review Comment: I think it may be better to keep to `shuffle()` here, I was just testing some skew scenarios and forgot this in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
1996fanrui commented on PR #21697: URL: https://github.com/apache/flink/pull/21697#issuecomment-1405010319 > Thanks for the improvement. Production code LGTM. Could you add a unit test for the `AbstractStreamTaskNetworkInput` or `StreamTaskNetworkInput` (in `StreamTaskNetworkInputTest.java`?), that will check if the records are batched and that batches can be interrupted? Thanks for your review, I have updated the unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21760: fix scala code
flinkbot commented on PR #21760: URL: https://github.com/apache/flink/pull/21760#issuecomment-1405009024 ## CI report: * 03e4e171a576de743ae8ccec2975235327fb4622 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] novakov-alexey opened a new pull request, #21760: fix scala code
novakov-alexey opened a new pull request, #21760: URL: https://github.com/apache/flink/pull/21760 ## What is the purpose of the change documentation fix. ## Brief change log Add missing val keyword ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] dawidwys commented on pull request #603: Flink 1.16.1
dawidwys commented on PR #603: URL: https://github.com/apache/flink-web/pull/603#issuecomment-1405004500 @XComp > I'm not sure about the test instabilities. Do we include them? I remember some document where it stated that we should exclude them. But I couldn't find any documentation, anymore. I believe we should exclude them, as the the first paragraph of the blogpost says > Below you will find a list of all bugfixes and improvements (excluding improvements to the build infrastructure and build stability). For a complete list of all changes see: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30795) StreamingWithStateTestBase of win not correct
[ https://issues.apache.org/jira/browse/FLINK-30795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JinxinTang updated FLINK-30795: --- Description: Windows path such as " [file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439|file:///C:/UsersxxAppDataLocalTempjunit373749850266957074junit7014045318909690439] " will throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); > StreamingWithStateTestBase of win not correct > - > > Key: FLINK-30795 > URL: https://issues.apache.org/jira/browse/FLINK-30795 > Project: Flink > Issue Type: Bug > Components: Tests > Environment: Windows path such as > " > file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439 > " > will throw > > new IllegalArgumentException("Cannot use the root directory for > checkpoints."); >Reporter: JinxinTang >Priority: Major > > Windows path such as > " > [file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439|file:///C:/UsersxxAppDataLocalTempjunit373749850266957074junit7014045318909690439] > " > will throw > > new IllegalArgumentException("Cannot use the root directory for > checkpoints."); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] dawidwys commented on a diff in pull request #603: Flink 1.16.1
dawidwys commented on code in PR #603: URL: https://github.com/apache/flink-web/pull/603#discussion_r1087851914 ## _posts/2023-01-26-release-1.16.1.md: ## @@ -0,0 +1,258 @@ +--- +layout: post +title: "Apache Flink 1.16.1 Release Announcement" +date: 2023-01-23T22:00:00.000Z +categories: news +authors: +- MartijnVisser: + name: "Martijn Visser" + twitter: "martijnvisser82" + +excerpt: The Apache Flink Community is pleased to announce a bug fix release for Flink 1.16. + +--- + +The Apache Flink Community is pleased to announce the first bug fix release of the Flink 1.16 series. + +This release includes 84 bug fixes, vulnerability fixes, and minor improvements for Flink 1.16. +Below you will find a list of all bugfixes and improvements (excluding improvements to the build infrastructure and build stability). For a complete list of all changes see: +[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352344). + +We highly recommend all users upgrade to Flink 1.16.1. + +# Release Artifacts + +## Maven Dependencies + +```xml + + org.apache.flink + flink-java + 1.16.1 + + + org.apache.flink + flink-streaming-java + 1.16.1 + + + org.apache.flink + flink-clients + 1.16.1 + +``` + +## Binaries + +You can find the binaries on the updated [Downloads page]({{ site.baseurl }}/downloads.html). + +## Docker Images + +* [library/flink](https://hub.docker.com/_/flink?tab=tags=1=1.16.1) (official images) +* [apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.16.1) (ASF repository) + +## PyPi + +* [apache-flink==1.16.1](https://pypi.org/project/apache-flink/1.16.1/) + +# Upgrade Notes + +[FLINK-28988 - Incorrect result for filter after temporal join](https://issues.apache.org/jira/browse/FLINK-28988) +The filter will not be pushed down into both inputs of the event time temporal join. +This may cause incompatible plan changes compared to Flink 1.16.0, e.g., when the left input is an upsert source +(like upsert-kafka connector), the query plan will remove the ChangelogNormalize node in Flink 1.16.1, while it +did appear in 1.16.0. + +[FLINK-29849 - Event time temporal join on an upsert source may produce incorrect execution plan](https://issues.apache.org/jira/browse/FLINK-29849) +This resolves the correctness issue when do event time temporal join with a versioned table backed by an upsert source. Review Comment: ```suggestion This resolves the correctness issue when doing an event time temporal join with a versioned table backed by an upsert source. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30795) StreamingWithStateTestBase of win not correct
JinxinTang created FLINK-30795: -- Summary: StreamingWithStateTestBase of win not correct Key: FLINK-30795 URL: https://issues.apache.org/jira/browse/FLINK-30795 Project: Flink Issue Type: Bug Components: Tests Environment: Windows path such as " file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439 " will throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); Reporter: JinxinTang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087835183 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ## @@ -115,29 +109,22 @@ private List discoverSplits() { @Override public void addSplitsBack(List splits, int subtaskId) { LOG.info("Add {} splits back to CassandraSplitEnumerator.", splits.size()); -state.addNewSplits(splits, enumeratorContext.currentParallelism()); -assignUnprocessedSplitsToReader(subtaskId); +state.addNewSplits(splits); } @Override public void addReader(int subtaskId) { LOG.info("Adding reader {} to CassandraSplitEnumerator.", subtaskId); -assignUnprocessedSplitsToReader(subtaskId); +assignUnprocessedSplitToReader(subtaskId); } -private void assignUnprocessedSplitsToReader(int readerId) { +private void assignUnprocessedSplitToReader(int readerId) { checkReaderRegistered(readerId); - -final Set splitsForReader = state.getSplitsForReader(readerId); -if (splitsForReader != null && !splitsForReader.isEmpty()) { -Map> assignment = new HashMap<>(); -assignment.put(readerId, Lists.newArrayList(splitsForReader)); -LOG.info("Assigning splits to reader {}", assignment); -enumeratorContext.assignSplits(new SplitsAssignment<>(assignment)); -} - -// periodically partition discovery is disabled, signal NoMoreSplitsEvent to the reader -LOG.debug( +final CassandraSplit cassandraSplit = state.getASplit(); Review Comment: Doesn't this have to handle the case where the split is `null`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087821967 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java: ## @@ -48,6 +52,30 @@ public CassandraSplitState toSplitState() { return new CassandraSplitState(new HashSet<>(ringRanges), splitId()); } +public void serialize(ObjectOutputStream objectOutputStream) throws IOException { Review Comment: let's make this package private. Personally I'd move all of it into the serializer, but I can see why it could be useful to have it here. For symmetry I'd prefer this method to be static and accept a CassandraSplit instance. ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java: ## @@ -48,6 +52,30 @@ public CassandraSplitState toSplitState() { return new CassandraSplitState(new HashSet<>(ringRanges), splitId()); } +public void serialize(ObjectOutputStream objectOutputStream) throws IOException { +objectOutputStream.writeInt(ringRanges.size()); +for (RingRange ringRange : ringRanges) { +objectOutputStream.writeObject(ringRange.getStart()); +objectOutputStream.writeObject(ringRange.getEnd()); +} +} + +public static CassandraSplit deserialize(ObjectInputStream objectInputStream) +throws IOException { +try { +final int nbRingRanges = objectInputStream.readInt(); +Set ringRanges = new HashSet<>(nbRingRanges); Review Comment: note: Passing npRingRanges won't have the desired effect of avoiding allocations, since the backing map is resized when the map is filled to a certain load. In fact you've guaranteed that the map will be resized at least once. ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private
[GitHub] [flink] XComp commented on pull request #21756: [FLINK-30787][debug] Adds dmesg call to test_controller to check whether it would succeed on Alibaba instances.
XComp commented on PR #21756: URL: https://github.com/apache/flink/pull/21756#issuecomment-1404966145 I wanted to verify it because of [Robert's comment](https://issues.apache.org/jira/browse/FLINK-30787?focusedCommentId=17680583=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17680583). He sounded confident that it worked at some point in time. But you're right; it looks like the azure pipeline change didn't cause that. I actually verified the order of the commits, beforehand, but must have mixed up the commit order. So, I see two options here: 1. we remove this part of the code again because nobody complained in the passed. 2. We enable kernel log access in the docker container as explained in [this StackExchange thread](https://unix.stackexchange.com/questions/390184/dmesg-read-kernel-buffer-failed-permission-denied) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #21756: [FLINK-30787][debug] Adds dmesg call to test_controller to check whether it would succeed on Alibaba instances.
zentol commented on PR #21756: URL: https://github.com/apache/flink/pull/21756#issuecomment-1404950896 I mean, it also fails in your personal azure, and the whole dmesg business was added _after_ FLINK-13978. Chances are this just fails because the tests are run in docker :shrug: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #21751: [FLINK-30619][tests] Add retry
XComp commented on code in PR #21751: URL: https://github.com/apache/flink/pull/21751#discussion_r1087810726 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -585,7 +586,7 @@ public void testStatusMetrics() throws Exception { // sleep a bit to ensure uptime is > 0 Thread.sleep(10L); -assertThat(upTimeGauge.getValue()).isGreaterThan(0L); +CommonTestUtils.waitUntilCondition(() -> upTimeGauge.getValue() > 0L); Review Comment: The `Thread.sleep(10L)` can be removed if we add the waitUntil condition. Additionally, the same pattern with `Thread.sleep` and `isGreatherThan` is applied a few lines further down in the test code. We might want to update that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #21751: [FLINK-30619][tests] Add retry
zentol commented on PR #21751: URL: https://github.com/apache/flink/pull/21751#issuecomment-1404938724 I doubt that a thread dump is gonna help us much since we know that the job state was changed from the existing logs. But in the end waiting for it is a de-facto retry so why not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #509: [FLINK-30653] Trigger resource Events on autoscaler actions
mxm commented on code in PR #509: URL: https://github.com/apache/flink-kubernetes-operator/pull/509#discussion_r1087710879 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java: ## @@ -99,14 +103,27 @@ public boolean scaleResource( return false; } +var scalingReport = scalingReport(scalingSummaries); +eventRecorder.triggerEvent( +resource, +EventRecorder.Type.Normal, +EventRecorder.Reason.ScalingReport, +EventRecorder.Component.Operator, +scalingReport); + +if (!conf.get(SCALING_ENABLED)) { +return false; +} Review Comment: That is awesome! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30717) Migrate Travis CI to Github Actions
[ https://issues.apache.org/jira/browse/FLINK-30717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678635#comment-17678635 ] Chesnay Schepler edited comment on FLINK-30717 at 1/26/23 10:59 AM: benchmarks: 9fa36c41b055b807ff7fc25cc548ff04f9ace28d trainings: 188f865c62599ad92d538e5fbf7b1dc9a94d8f41 was (Author: zentol): benchmarks: 9fa36c41b055b807ff7fc25cc548ff04f9ace28d > Migrate Travis CI to Github Actions > --- > > Key: FLINK-30717 > URL: https://issues.apache.org/jira/browse/FLINK-30717 > Project: Flink > Issue Type: Technical Debt > Components: Benchmarks, Documentation / Training >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > Infra decided to no longer support Travis in the near future so we need to > migrate some repos. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30717) Migrate Travis CI to Github Actions
[ https://issues.apache.org/jira/browse/FLINK-30717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-30717. Resolution: Fixed > Migrate Travis CI to Github Actions > --- > > Key: FLINK-30717 > URL: https://issues.apache.org/jira/browse/FLINK-30717 > Project: Flink > Issue Type: Technical Debt > Components: Benchmarks, Documentation / Training >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > Infra decided to no longer support Travis in the near future so we need to > migrate some repos. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-training] zentol merged pull request #51: [FLINK-30717][ci] Migrate to GitHub Actions
zentol merged PR #51: URL: https://github.com/apache/flink-training/pull/51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1404820642 > > I don't think it is an ASF rule but fair enough, > > FYI; By and large it should be viewed as a legal requirement. By copying code from cassandra you have to adhere to their licensing, which among other state that you must have prominent notices for changes to a file. You're right cf ASF v2 4.d: `If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.` Anyway, as we decided to replace the split code with something that was never merged to Beam, there is no need anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30754) Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
[ https://issues.apache.org/jira/browse/FLINK-30754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-30754. -- Fix Version/s: 1.17 Resolution: Fixed 3a64648 in master > Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test > issues > > > Key: FLINK-30754 > URL: https://issues.apache.org/jira/browse/FLINK-30754 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.17.0 >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > Fix For: 1.17 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] mbalassi merged pull request #21732: [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
mbalassi merged PR #21732: URL: https://github.com/apache/flink/pull/21732 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private final Cluster cluster; +private final Session session; +private final Set unprocessedSplits; +private final AtomicBoolean wakeup = new AtomicBoolean(false); +private final String query; + +public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { +// need a thread safe set Review Comment: I've pushed an impl that assigns only a single split to each reader ans this removed the need for cucurrent set ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private final Cluster cluster; +private final Session session; +private final Set unprocessedSplits; +private final AtomicBoolean wakeup = new AtomicBoolean(false); +private final String query; + +public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { +// need a thread safe set Review Comment: I'll push an impl that assigns only a single split to each reader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java: ## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitState; +import org.apache.flink.connector.cassandra.source.split.RingRange; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}. For that, it executes a range query (query that outputs records belonging + * to a {@link RingRange}) based on the user specified query. This class manages the Cassandra + * cluster and session. + */ +public class CassandraSplitReader implements SplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); +public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + +private final Cluster cluster; +private final Session session; +private final Set unprocessedSplits; +private final AtomicBoolean wakeup = new AtomicBoolean(false); +private final String query; + +public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) { +// need a thread safe set Review Comment: I pushed an impl that assigns only a single split to each reader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source
echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087605427 ## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator +implements SplitEnumerator { +private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); +private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + +private final SplitEnumeratorContext enumeratorContext; +private final CassandraEnumeratorState state; +private final Cluster cluster; + +public CassandraSplitEnumerator( +SplitEnumeratorContext enumeratorContext, +CassandraEnumeratorState state, +ClusterBuilder clusterBuilder) { +this.enumeratorContext = enumeratorContext; +this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; +this.cluster = clusterBuilder.getCluster(); +} + +@Override +public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { +assignUnprocessedSplitsToReader(subtaskId); +} + +@Override +public void start() { +// discover the splits and update unprocessed splits and then assign them. +// There is only an initial splits discovery, no periodic discovery. +enumeratorContext.callAsync( +this::discoverSplits, +(splits, throwable) -> { +LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); +state.addNewSplits(splits, enumeratorContext.currentParallelism()); +}); +} + +private List discoverSplits() { +final int numberOfSplits = enumeratorContext.currentParallelism(); +final Metadata clusterMetadata = cluster.getMetadata(); +final String partitioner = clusterMetadata.getPartitioner(); +final SplitsGenerator splitsGenerator = new SplitsGenerator(partitioner); +if (MURMUR3PARTITIONER.equals(partitioner)) { +LOG.info("Murmur3Partitioner detected, splitting"); +List tokens = +clusterMetadata.getTokenRanges().stream() +.map( +tokenRange -> +new BigInteger( + tokenRange.getEnd().getValue().toString())) +.collect(Collectors.toList()); +return splitsGenerator.generateSplits(numberOfSplits, tokens); +} else { +// Murmur3Partitioner is the default and recommended partitioner for Cassandra 1.2+ +// see +// https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html +LOG.warn( +"The current Cassandra partitioner is {}, only
[jira] [Created] (FLINK-30794) Disable dependency convergence check for night connector builds
Martijn Visser created FLINK-30794: -- Summary: Disable dependency convergence check for night connector builds Key: FLINK-30794 URL: https://issues.apache.org/jira/browse/FLINK-30794 Project: Flink Issue Type: Technical Debt Components: Connectors / Common Reporter: Martijn Visser Assignee: Martijn Visser -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21759: [FLINK-29237][table] Remove RexSimplify from Flink code, SearchOperator code generation for RexUnknown.nullAs
flinkbot commented on PR #21759: URL: https://github.com/apache/flink/pull/21759#issuecomment-1404704386 ## CI report: * 8d766c9246e2178ebc237d93171e918c74e38050 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29237) RexSimplify can not be removed after update to calcite 1.27
[ https://issues.apache.org/jira/browse/FLINK-29237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29237: --- Labels: pull-request-available (was: ) > RexSimplify can not be removed after update to calcite 1.27 > --- > > Key: FLINK-29237 > URL: https://issues.apache.org/jira/browse/FLINK-29237 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > It seems there is some work should be done to make it happen > Currently removal of RexSimplify from Flink repo leads to failure of several > tests like > {{IntervalJoinTest#testFallbackToRegularJoin}} > {{CalcITCase#testOrWithIsNullInIf}} > {{CalcITCase#testOrWithIsNullPredicate}} > example of failure > {noformat} > Sep 07 11:25:08 java.lang.AssertionError: > Sep 07 11:25:08 > Sep 07 11:25:08 Results do not match for query: > Sep 07 11:25:08 > Sep 07 11:25:08 SELECT * FROM NullTable3 AS T > Sep 07 11:25:08 WHERE T.a = 1 OR T.a = 3 OR T.a IS NULL > Sep 07 11:25:08 > Sep 07 11:25:08 > Sep 07 11:25:08 Results > Sep 07 11:25:08 == Correct Result - 4 == == Actual Result - 2 == > Sep 07 11:25:08 +I[1, 1, Hi] +I[1, 1, Hi] > Sep 07 11:25:08 +I[3, 2, Hello world] +I[3, 2, Hello world] > Sep 07 11:25:08 !+I[null, 999, NullTuple] > Sep 07 11:25:08 !+I[null, 999, NullTuple] > Sep 07 11:25:08 > Sep 07 11:25:08 Plan: > Sep 07 11:25:08 == Abstract Syntax Tree == > Sep 07 11:25:08 LogicalProject(inputs=[0..2]) > Sep 07 11:25:08 +- LogicalFilter(condition=[OR(=($0, 1), =($0, 3), IS > NULL($0))]) > Sep 07 11:25:08+- LogicalTableScan(table=[[default_catalog, > default_database, NullTable3]]) > Sep 07 11:25:08 > Sep 07 11:25:08 == Optimized Logical Plan == > Sep 07 11:25:08 Calc(select=[a, b, c], where=[SEARCH(a, Sarg[1, 3; NULL AS > TRUE])]) > Sep 07 11:25:08 +- BoundedStreamScan(table=[[default_catalog, > default_database, NullTable3]], fields=[a, b, c]) > Sep 07 11:25:08 > Sep 07 11:25:08 > Sep 07 11:25:08 at org.junit.Assert.fail(Assert.java:89) > Sep 07 11:25:08 at > org.apache.flink.table.planner.runtime.utils.BatchTestBase.$anonfun$check$1(BatchTestBase.scala:154) > Sep 07 11:25:08 at > org.apache.flink.table.planner.runtime.utils.BatchTestBase.$anonfun$check$1$adapted(BatchTestBase.scala:147) > Sep 07 11:25:08 at scala.Option.foreach(Option.scala:257) > Sep 07 11:25:08 at > org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:147) > Sep 07 11:25:08 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) > Sep 07 11:25:08 at > org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) > Sep 07 11:25:08 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > Sep 07 11:25:08 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)