[jira] [Commented] (FLINK-29405) InputFormatCacheLoaderTest is unstable

2023-01-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681211#comment-17681211
 ] 

Matthias Pohl commented on FLINK-29405:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45230=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=10958

> InputFormatCacheLoaderTest is unstable
> --
>
> Key: FLINK-29405
> URL: https://issues.apache.org/jira/browse/FLINK-29405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Chesnay Schepler
>Assignee: Alexander Smirnov
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
>
> #testExceptionDuringReload/#testCloseAndInterruptDuringReload fail reliably 
> when run in a loop.
> {code}
> java.lang.AssertionError: 
> Expecting AtomicInteger(0) to have value:
>   0
> but did not.
>   at 
> org.apache.flink.table.runtime.functions.table.fullcache.inputformat.InputFormatCacheLoaderTest.testCloseAndInterruptDuringReload(InputFormatCacheLoaderTest.java:161)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30727) JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException

2023-01-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681210#comment-17681210
 ] 

Matthias Pohl commented on FLINK-30727:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45229=logs=f2c100be-250b-5e85-7bbe-176f68fcddc5=05efd11e-5400-54a4-0d27-a4663be008a9=12514

> JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException
> 
>
> Key: FLINK-30727
> URL: https://issues.apache.org/jira/browse/FLINK-30727
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Yunhong Zheng
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> IOException due to timeout occurring while requesting exclusive NetworkBuffer 
> caused JoinReorderITCase.testBushyTreeJoinReorder to fail:
> {code}
> [...]
> Jan 18 01:11:27 Caused by: java.io.IOException: Timeout triggered when 
> requesting exclusive buffers: The total number of network buffers is 
> currently set to 2048 of 32768 bytes each. You can increase this number by 
> setting the configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:256)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:179)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:262)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:517)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:277)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:962)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:648)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:556)
> Jan 18 01:11:27   at java.lang.Thread.run(Thread.java:748)
> {code}
> Same build, 2 failures:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=14300
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=14362



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29427) LookupJoinITCase failed with classloader problem

2023-01-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681209#comment-17681209
 ] 

Matthias Pohl commented on FLINK-29427:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45220=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=16759

> LookupJoinITCase failed with classloader problem
> 
>
> Key: FLINK-29427
> URL: https://issues.apache.org/jira/browse/FLINK-29427
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Alexander Smirnov
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
>
> {code:java}
> 2022-09-27T02:49:20.9501313Z Sep 27 02:49:20 Caused by: 
> org.codehaus.janino.InternalCompilerException: Compiling 
> "KeyProjection$108341": Trying to access closed classloader. Please check if 
> you store classloaders directly or indirectly in static fields. If the 
> stacktrace suggests that the leak occurs in a third party library and cannot 
> be fixed immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> 2022-09-27T02:49:20.9502654Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382)
> 2022-09-27T02:49:20.9503366Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> 2022-09-27T02:49:20.9504044Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> 2022-09-27T02:49:20.9504704Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> 2022-09-27T02:49:20.9505341Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> 2022-09-27T02:49:20.9505965Z Sep 27 02:49:20  at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> 2022-09-27T02:49:20.9506584Z Sep 27 02:49:20  at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> 2022-09-27T02:49:20.9507261Z Sep 27 02:49:20  at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
> 2022-09-27T02:49:20.9507883Z Sep 27 02:49:20  ... 30 more
> 2022-09-27T02:49:20.9509266Z Sep 27 02:49:20 Caused by: 
> java.lang.IllegalStateException: Trying to access closed classloader. Please 
> check if you store classloaders directly or indirectly in static fields. If 
> the stacktrace suggests that the leak occurs in a third party library and 
> cannot be fixed immediately, you can disable this check with the 
> configuration 'classloader.check-leaked-classloader'.
> 2022-09-27T02:49:20.9510835Z Sep 27 02:49:20  at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)
> 2022-09-27T02:49:20.9511760Z Sep 27 02:49:20  at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
> 2022-09-27T02:49:20.9512456Z Sep 27 02:49:20  at 
> java.lang.Class.forName0(Native Method)
> 2022-09-27T02:49:20.9513014Z Sep 27 02:49:20  at 
> java.lang.Class.forName(Class.java:348)
> 2022-09-27T02:49:20.9513649Z Sep 27 02:49:20  at 
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:89)
> 2022-09-27T02:49:20.9514339Z Sep 27 02:49:20  at 
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:312)
> 2022-09-27T02:49:20.9514990Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:8556)
> 2022-09-27T02:49:20.9515659Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6749)
> 2022-09-27T02:49:20.9516337Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)
> 2022-09-27T02:49:20.9516989Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573)
> 2022-09-27T02:49:20.9517632Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215)
> 2022-09-27T02:49:20.9518319Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481)
> 2022-09-27T02:49:20.9519018Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476)
> 2022-09-27T02:49:20.9519680Z Sep 27 02:49:20  at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928)
> 2022-09-27T02:49:20.9520386Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476)
> 2022-09-27T02:49:20.9521042Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469)
> 

[jira] [Commented] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023

2023-01-26 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681196#comment-17681196
 ] 

Rui Fan commented on FLINK-30623:
-

FLINK-26806 was merged at 23.01.2023, I'm not sure whether it will affect the 
benchmark. I can check it today.

> Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
> ---
>
> Key: FLINK-30623
> URL: https://issues.apache.org/jira/browse/FLINK-30623
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Checkpointing
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Performance regression
> checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005
> checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277
> deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395
> stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919
> tupleKeyBy median=4155.684199 recent_median=3987.5812305
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fredia commented on pull request #21747: [FLINK-24932][state] Bump frocksdbjni to 6.20.3-ververica-2.0

2023-01-26 Thread via GitHub


fredia commented on PR #21747:
URL: https://github.com/apache/flink/pull/21747#issuecomment-1406049554

   @MartijnVisser Thanks for the review and verification.
   
   I compared the benchmark results based on the latest 
master(f6c7c30118ef26f98a7d422831fa8047f1fd9f98), below are the results. 
   As we can see, the performance behaves the same as @Myasuka's 
[result](https://issues.apache.org/jira/browse/FLINK-24932?focusedCommentId=17569889=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17569889),
 the performance of version 2.0 is almost the same as that of version 1.0.
   
   In addition to supporting M1, .6.20.3-ververica-2.0 also updates the 
[dependencies](https://github.com/ververica/frocksdb/pull/56) of zlib, adds 
`periodic_compaction_seconds` 
[option](https://github.com/ververica/frocksdb/pull/57) to RocksJava.
   
   http://www.w3.org/TR/REC-html40;>
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   Benchmark | 6.20.3-ververica-1.0 | 6.20.3-ververica-2.0 | ratio
   -- | -- | -- | --
   org.apache.flink.state.benchmark.ListStateBenchmark.listAdd | 389.803 | 
386.465338 | -0.008562433
   org.apache.flink.state.benchmark.ListStateBenchmark.listAddAll | 311.726 | 
314.086747 | 0.007573148
   org.apache.flink.state.benchmark.ListStateBenchmark.listAppend | 387.375 | 
385.4015 | -0.005094547
   org.apache.flink.state.benchmark.ListStateBenchmark.listGet | 152.5 | 
154.95988 | 0.016130361
   org.apache.flink.state.benchmark.ListStateBenchmark.listGetAndIterate | 
155.356 | 155.927967 | 0.003681654
   org.apache.flink.state.benchmark.ListStateBenchmark.listUpdate | 391.685 | 
388.147242 | -0.009032151
   org.apache.flink.state.benchmark.MapStateBenchmark.mapAdd | 320.924 | 
314.778435 | -0.019149596
   org.apache.flink.state.benchmark.MapStateBenchmark.mapContains | 66.928 | 
67.744243 | 0.012195837
   org.apache.flink.state.benchmark.MapStateBenchmark.mapEntries | 439.316 | 
438.016991 | -0.00295689
   org.apache.flink.state.benchmark.MapStateBenchmark.mapGet | 66.511 | 
66.365644 | -0.002185443
   org.apache.flink.state.benchmark.MapStateBenchmark.mapIsEmpty | 59.914 | 
60.307518 | 0.006568048
   org.apache.flink.state.benchmark.MapStateBenchmark.mapIterator | 441.623 | 
439.512541 | -0.00477887
   org.apache.flink.state.benchmark.MapStateBenchmark.mapKeys | 439.221 | 
440.065508 | 0.00192274
   org.apache.flink.state.benchmark.MapStateBenchmark.mapPutAll | 85.741 | 
85.65495 | -0.001003604
   org.apache.flink.state.benchmark.MapStateBenchmark.mapRemove | 335.227 | 
358.395208 | 0.069111999
   org.apache.flink.state.benchmark.MapStateBenchmark.mapUpdate | 310.086 | 
339.343382 | 0.094352476
   org.apache.flink.state.benchmark.MapStateBenchmark.mapValues | 443.319 | 
446.092917 | 0.006257158
   org.apache.flink.state.benchmark.ValueStateBenchmark.valueAdd | 322.669 | 
306.841597 | -0.049051514
   org.apache.flink.state.benchmark.ValueStateBenchmark.valueGet | 437.556 | 
443.970351 | 0.014659497
   org.apache.flink.state.benchmark.ValueStateBenchmark.valueUpdate | 331.987 | 
336.718447 | 0.014251904
   
org.apache.flink.state.benchmark.RocksdbStateBackendRescalingBenchmarkExecutor.rescaleRocksDB
 | 18845.28 | 18719.78056 | -0.006659463
   
org.apache.flink.state.benchmark.RocksdbStateBackendRescalingBenchmarkExecutor.rescaleRocksDB
 | 1746.154 | 1848.152231 | 0.058413079
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] kmozaid commented on pull request #504: [FLINK-30669] update recent job status in flinkdeployment object

2023-01-26 Thread via GitHub


kmozaid commented on PR #504:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/504#issuecomment-1406021738

   > I agree @kmozaid if this doesn't affect existing jobs and improves some 
others then the change makes sense :)
   > 
   > Could you please add a unit test to guard this change against possible 
future regressions?
   
   sure, let me add a test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30799) Make SinkFunction support speculative execution for batch jobs

2023-01-26 Thread Biao Liu (Jira)
Biao Liu created FLINK-30799:


 Summary: Make SinkFunction support speculative execution for batch 
jobs
 Key: FLINK-30799
 URL: https://issues.apache.org/jira/browse/FLINK-30799
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


In this ticket, it would make SinkFunction based sink run with speculative 
execution for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30798) Make OutputFormat support speculative execution for batch jobs

2023-01-26 Thread Biao Liu (Jira)
Biao Liu created FLINK-30798:


 Summary: Make OutputFormat support speculative execution for batch 
jobs
 Key: FLINK-30798
 URL: https://issues.apache.org/jira/browse/FLINK-30798
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


This issue would make OutputFormat based sink run with speculative execution 
for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21765: [Flink-30755][runtime] Make SinkV2 support speculative execution for batch jobs

2023-01-26 Thread via GitHub


flinkbot commented on PR #21765:
URL: https://github.com/apache/flink/pull/21765#issuecomment-1405950702

   
   ## CI report:
   
   * 74cdbff981b35993ef2546d508f0ad6c2e4c2ba0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ifndef-SleePy opened a new pull request, #21765: [Flink-30755][runtime] Make SinkV2 support speculative execution for batch jobs

2023-01-26 Thread via GitHub


ifndef-SleePy opened a new pull request, #21765:
URL: https://github.com/apache/flink/pull/21765

   ## What is the purpose of the change
   
   *This pull request introduces a way to allow SinkV2 run with speculative 
execution*
   
   ## Brief change log
   
 - *Introduce SupportsConcurrentExecutionAttempts interface*
 - *Add supportsConcurrentExecutionAttempts property for Transformation, 
StreamGraph, JobGraph*
 - *Make SinkV2 support speculative execution through implementing 
SupportsConcurrentExecutionAttempts interface*
 - *Use supportsConcurrentExecutionAttempts for excluding sink of 
speculative execution*
   
   ## Verifying this change
   
   This change added tests:
 - *Add new test case in StreamingJobGraphGeneratorTest*
 - *Add new test case in SpeculativeSchedulerITCase*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop

2023-01-26 Thread via GitHub


1996fanrui commented on PR #21697:
URL: https://github.com/apache/flink/pull/21697#issuecomment-1405922443

   Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30709) NetworkInput#emitNext() should push records to DataOutput in a while loop

2023-01-26 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin closed FLINK-30709.

Resolution: Fixed

> NetworkInput#emitNext() should push records to DataOutput in a while loop
> -
>
> Key: FLINK-30709
> URL: https://issues.apache.org/jira/browse/FLINK-30709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA 
> focus on Network input.
>  
> Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push 
> at most one record to the given DataOutput. This unnecessarily increases the 
> average Java call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this 
> program, Flink runtime needs to include in the call stack these 3 function 
> calls:
>  * StreamTask#processInput()
>  * StreamOneInputProcessor#processInput()
>  * AbstractStreamTaskNetworkInput#emitNext()
> This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to 
> push records to DataOutput in a while loop. It improves Flink performance by 
> removing an average of 3 function from the call stack needed to produce a 
> record.
> Here are the benchmark results obtained by running the 
> [InputBenchmark#mapSink|https://github.com/apache/flink-benchmarks/blob/0bafe0e85700c889894324aadb70302381f98e03/src/main/java/org/apache/flink/benchmark/InputBenchmark.java#L55]
>  with env.disableOperatorChaining(). And I run it 4 times on My Mac.
>  
> {code:java}
> Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed 
> results:
> Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12339.771 ± 414.649  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12687.872 ± 320.084  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12256.445 ± 512.219  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12432.154 ± 405.083  ops/ms
> After the proposed change, the avg is 13836.845 ops/ms, here is detailed 
> results:
> Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13092.451 ± 490.886  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13881.138 ± 370.249  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13960.280 ± 389.505  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  14413.511 ± 727.844  
> ops/ms{code}
>  
> The proposed change increases throughput by 11.3%.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lindong28 closed pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop

2023-01-26 Thread via GitHub


lindong28 closed pull request #21697: [FLINK-30709][runtime] 
NetworkInput#emitNext() should push records to DataOutput within a while loop
URL: https://github.com/apache/flink/pull/21697


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] kristoffSC commented on pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods

2023-01-26 Thread via GitHub


kristoffSC commented on PR #21393:
URL: https://github.com/apache/flink/pull/21393#issuecomment-1405755647

   @tsreaper 
   I've refactored both `BlockStatementSplitter` and `BlockStatementGrouper` so 
they use one `for` loop and one `visitor` instance like you suggested in your 
comments. There is no merge/add blocks now.
   
   I would appreciate if you take a look and let me know what do you think.
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] kristoffSC commented on a diff in pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods

2023-01-26 Thread via GitHub


kristoffSC commented on code in PR #21393:
URL: https://github.com/apache/flink/pull/21393#discussion_r1088399790


##
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java:
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Extract statements from IFs, ELSEs and WILEs blocks making them smaller.
+ *
+ * BlockStatementSplitter does not recognize if statement operates on local 
of class member
+ * variable. Because of that, code must be preprocessed by {@link 
DeclarationRewriter} which
+ * converts all local variables extracted as to member variables.
+ *
+ * Before
+ *
+ * 
+ * while (counter > 0) {
+ * int localA = a + 1000;
+ * System.out.println(localA);
+ * if (a > 0) {
+ * b = a * 2;
+ * c = b * 2;
+ * System.out.println(b);
+ * } else {
+ * b = a * 3;
+ * System.out.println(b);
+ * }
+ * counter--;
+ * }
+ *
+ * 
+ *
+ * After
+ *
+ * 
+ * while (counter > 0) {
+ * myFun_whileBody0_0(int a);
+ * if (a > 0) {
+ * myFun_whileBody0_0_ifBody0(int a);
+ * } else {
+ * myFun_whileBody0_0_ifBody1(int a);
+ * }
+ * counter--;
+ * }
+ * 
+ *
+ * Where bodies of extracted "methods" are:
+ *
+ * 
+ * myFun_whileBody0_0(int a) ->
+ * int localA = a + 1000;
+ * System.out.println(localA);
+ * 
+ *
+ * 
+ * myFun_whileBody0_0_ifBody0(int a) ->
+ * b = a * 2;
+ * c = b * 2;
+ * System.out.println(b);
+ * 
+ *
+ * 
+ * myFun_whileBody0_0_ifBody1(int a) ->
+ * b = a * 3;
+ * System.out.println(b);
+ * 
+ */
+@Internal
+public class BlockStatementSplitter {
+
+private final String code;
+
+private final String parameters;
+
+private BlockStatementVisitor visitor;
+
+/**
+ * Initialize new BlockStatementSplitter.
+ *
+ * @param code a code block that should be rewritten.
+ * @param parameters parameters definition that should be used for 
extracted methods.
+ */
+public BlockStatementSplitter(String code, String parameters) {
+this.code = code;
+this.parameters = parameters;
+}
+
+/**
+ * This method extracts statements from IFs, ELSE's and WHILE blocks from 
block code used during
+ * initialization of this object. Every entry of returned map can be seen 
as new method nam (map
+ * key) and method's body. The block names will be prefixed with provided 
context.
+ *
+ * @param context prefix for extracted blocks.
+ * @return a map of block name to block statements mappings. The key can 
be interpreted as name
+ * of extracted block/method and corresponding List represents 
individual statements (block'
+ * lines) for this block.
+ */
+public Map> extractBlocks(String context) {
+
+this.visitor = new BlockStatementVisitor(code, context, parameters);
+JavaParser javaParser = new JavaParser(visitor.tokenStream);
+javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+visitor.visitStatement(javaParser.statement());
+return visitor.getAllBlocks();
+}
+
+/**
+ * Rewrite code block that was used for this object initialization.
+ *
+ * @return a map which key represent rewritten block name and value 
represents rewritten code
+ * block, including calls to extracted methods
+ */
+public Map rewriteBlock() {
+visitor.rewrite();
+Map rewriteBlocks = new HashMap<>();
+

[jira] [Commented] (FLINK-30444) State recovery error not handled correctly and always causes JM failure

2023-01-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681133#comment-17681133
 ] 

Gyula Fora commented on FLINK-30444:


Hi [~dmvk] !
Did you make any progress on this? Would be good to hear what you think. I am 
also happy to help out with any dev/testing efforts.

> State recovery error not handled correctly and always causes JM failure
> ---
>
> Key: FLINK-30444
> URL: https://issues.apache.org/jira/browse/FLINK-30444
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.16.0, 1.14.6, 1.15.3
>Reporter: Gyula Fora
>Assignee: David Morávek
>Priority: Critical
>
> When you submit a job in Application mode and you try to restore from an 
> incompatible savepoint, there is a very unexpected behaviour.
> Even with the following config:
> {noformat}
> execution.shutdown-on-application-finish: false
> execution.submit-failed-job-on-application-error: true{noformat}
> The job goes into a FAILED state, and the jobmanager fails. In a kubernetes 
> environment (when using the native kubernetes integration) this means that 
> the JobManager is restarted automatically.
> This will mean that if you have jobresult store enabled, after the JM comes 
> back you will end up with an empty application cluster.
> I think the correct behaviour would be, depending on the above mention config:
> 1. If there is a job recovery error and you have 
> (execution.submit-failed-job-on-application-error) configured, then the job 
> should show up as failed, and the JM should not exit (if 
> execution.shutdown-on-application-finish is false)
> 2. If (execution.shutdown-on-application-finish is true) then the jobmanager 
> should exit cleanly like on normal job terminal state and thus stop the 
> deployment in Kubernetes, preventing a JM restart cycle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


gyfora commented on PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#issuecomment-1405650899

   > Thanks a lot @gyfora!
   > 
   > Regarding:
   > 
   > > We just have to ensure that we handle the Zookeeper dependency as 
optional
   > 
   > The dependency was actually already there! I didn't have to add anything.
   > 
   > I looked into it and all Curator classes are a transitive dependency of 
the `flink-clients` package which is listed as a dependency in `pom.xml` 
[here](https://github.com/antonipp/flink-kubernetes-operator/blob/57b2832d97a4d52d7fcd814d50b94f4e745ce027/flink-kubernetes-operator/pom.xml#L75-L79).
   > 
   > $ mvn dependency:tree -pl 'flink-kubernetes-operator'
   > [...]
   > [INFO] +- org.apache.flink:flink-clients:jar:1.15.3:compile
   > [INFO] |  +- org.apache.flink:flink-runtime:jar:1.15.3:compile
   > [INFO] |  |  +- org.apache.flink:flink-rpc-core:jar:1.15.3:compile
   > [INFO] |  |  +- [...]
   > [INFO] |  |  +- 
org.apache.flink:flink-shaded-zookeeper-3:jar:3.5.9-15.0:compile
   
   Hm, thats convenient :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


gyfora commented on code in PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088339662


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
 return kubernetesClient;
 }
 
+public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+}
+
 @Override
 public void submitApplicationCluster(
 JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
 LOG.info(
 "Deploying application cluster{}",
 requireHaMetadata ? " requiring last-state from HA metadata" : 
"");
+
+// If Kubernetes or Zookeeper HA are activated, delete the job graph 
in HA storage so that
+// the newly changed job config (e.g. parallelism) could take effect
 if (FlinkUtils.isKubernetesHAActivated(conf)) {
 final String clusterId = 
conf.get(KubernetesConfigOptions.CLUSTER_ID);
 final String namespace = 
conf.get(KubernetesConfigOptions.NAMESPACE);
-// Delete the job graph in the HA ConfigMaps so that the newly 
changed job config(e.g.
-// parallelism) could take effect
 FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, 
kubernetesClient);
+} else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+try (var curator = getCurator(conf)) {
+ZooKeeperUtils.deleteZNode(

Review Comment:
   The problem with HA storage access is both dependencies and credentials. Of 
which credentials is more problematic, it would open a big attack surface if 
the operator held storage creds of users somehow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


gyfora commented on code in PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088338668


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
 return kubernetesClient;
 }
 
+public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});

Review Comment:
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


gyfora commented on code in PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088338472


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
 return kubernetesClient;
 }
 
+public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+}
+
 @Override
 public void submitApplicationCluster(
 JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
 LOG.info(
 "Deploying application cluster{}",
 requireHaMetadata ? " requiring last-state from HA metadata" : 
"");
+
+// If Kubernetes or Zookeeper HA are activated, delete the job graph 
in HA storage so that
+// the newly changed job config (e.g. parallelism) could take effect
 if (FlinkUtils.isKubernetesHAActivated(conf)) {
 final String clusterId = 
conf.get(KubernetesConfigOptions.CLUSTER_ID);
 final String namespace = 
conf.get(KubernetesConfigOptions.NAMESPACE);
-// Delete the job graph in the HA ConfigMaps so that the newly 
changed job config(e.g.
-// parallelism) could take effect
 FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, 
kubernetesClient);
+} else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+try (var curator = getCurator(conf)) {

Review Comment:
   I think it's fine to create the client on demand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


gyfora commented on code in PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088337843


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##
@@ -340,6 +341,11 @@ protected Configuration build() {
 // Set cluster config
 effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, 
namespace);
 effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, 
clusterId);
+
+if (FlinkUtils.isZookeeperHAActivated(effectiveConfig)) {
+effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
clusterId);

Review Comment:
   In that case I would suggest using the namespace + name combination itself 
to get a unique resource identifier. That is basically the same logic for 
Kubernetes HA



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser commented on pull request #20049: [FLINK-28046][connectors] Mark SourceFunction interface as @Deprecated

2023-01-26 Thread via GitHub


MartijnVisser commented on PR #20049:
URL: https://github.com/apache/flink/pull/20049#issuecomment-1405608732

   @afedulov Do you think you can still make it before the feature freeze? 
(That's the 31st of January). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2

2023-01-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-30797.
--
Resolution: Fixed

Fixed in master: eb8a33fb0cb7a07443df835a3142a1a139206ed6

> Bump json5 from 1.0.1 to 1.0.2
> --
>
> Key: FLINK-30797
> URL: https://issues.apache.org/jira/browse/FLINK-30797
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Dependabot has created https://github.com/apache/flink/pull/21617
> This is the corresponding Jira ticket



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2

2023-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30797:
---
Labels: pull-request-available  (was: )

> Bump json5 from 1.0.1 to 1.0.2
> --
>
> Key: FLINK-30797
> URL: https://issues.apache.org/jira/browse/FLINK-30797
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Dependabot has created https://github.com/apache/flink/pull/21617
> This is the corresponding Jira ticket



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser merged pull request #21617: [FLINK-30797] Bump json5 from 1.0.1 to 1.0.2 in /flink-runtime-web/web-dashboard

2023-01-26 Thread via GitHub


MartijnVisser merged PR #21617:
URL: https://github.com/apache/flink/pull/21617


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2

2023-01-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30797:
---
Summary: Bump json5 from 1.0.1 to 1.0.2  (was: Bump json5 from 1.0.1 to 
1.0.2 in)

> Bump json5 from 1.0.1 to 1.0.2
> --
>
> Key: FLINK-30797
> URL: https://issues.apache.org/jira/browse/FLINK-30797
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
> Fix For: 1.17.0
>
>
> Dependabot has created https://github.com/apache/flink/pull/21617
> This is the corresponding Jira ticket



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30797) Bump json5 from 1.0.1 to 1.0.2 in

2023-01-26 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30797:
--

 Summary: Bump json5 from 1.0.1 to 1.0.2 in
 Key: FLINK-30797
 URL: https://issues.apache.org/jira/browse/FLINK-30797
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Web Frontend
Reporter: Martijn Visser
Assignee: Martijn Visser
 Fix For: 1.17.0


Dependabot has created https://github.com/apache/flink/pull/21617

This is the corresponding Jira ticket



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser commented on pull request #21671: [BK-1.16][FLINK-30657] Remove Shared and Key_Shared related tests in Pulsar connector

2023-01-26 Thread via GitHub


MartijnVisser commented on PR #21671:
URL: https://github.com/apache/flink/pull/21671#issuecomment-1405601473

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] kristoffSC commented on pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods

2023-01-26 Thread via GitHub


kristoffSC commented on PR #21393:
URL: https://github.com/apache/flink/pull/21393#issuecomment-1405572025

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023

2023-01-26 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681102#comment-17681102
 ] 

Piotr Nowojski edited comment on FLINK-30623 at 1/26/23 8:00 PM:
-

It seems like not everything has been fixed?

[http://codespeed.dak8s.net:8000/timeline/?ben=checkpointSingleInput.UNALIGNED=2]

Does anyone has some idea why?


was (Author: pnowojski):
It seems like everything has been fixed?

[http://codespeed.dak8s.net:8000/timeline/?ben=checkpointSingleInput.UNALIGNED=2]

Does anyone has some idea why?

> Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
> ---
>
> Key: FLINK-30623
> URL: https://issues.apache.org/jira/browse/FLINK-30623
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Checkpointing
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Performance regression
> checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005
> checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277
> deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395
> stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919
> tupleKeyBy median=4155.684199 recent_median=3987.5812305
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023

2023-01-26 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-30623:

  Assignee: (was: Rui Fan)

> Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
> ---
>
> Key: FLINK-30623
> URL: https://issues.apache.org/jira/browse/FLINK-30623
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Checkpointing
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Performance regression
> checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005
> checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277
> deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395
> stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919
> tupleKeyBy median=4155.684199 recent_median=3987.5812305
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30623) Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023

2023-01-26 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681102#comment-17681102
 ] 

Piotr Nowojski commented on FLINK-30623:


It seems like everything has been fixed?

[http://codespeed.dak8s.net:8000/timeline/?ben=checkpointSingleInput.UNALIGNED=2]

Does anyone has some idea why?

> Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
> ---
>
> Key: FLINK-30623
> URL: https://issues.apache.org/jira/browse/FLINK-30623
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Checkpointing
>Reporter: Martijn Visser
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Performance regression
> checkpointSingleInput.UNALIGNED median=338.1445195 recent_median=67.6453005
> checkpointSingleInput.UNALIGNED_1 median=213.230041 recent_median=39.830277
> deployAllTasks.STREAMING median=168.533106 recent_median=159.8534395
> stateBackends.MEMORY median=3229.0248875 recent_median=2985.782919
> tupleKeyBy median=4155.684199 recent_median=3987.5812305
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED_1=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployAllTasks.STREAMING=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=stateBackends.MEMORY=on=on=off=2=200
> http://codespeed.dak8s.net:8000/timeline/#/?exe=6=tupleKeyBy=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21764: [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.

2023-01-26 Thread via GitHub


flinkbot commented on PR #21764:
URL: https://github.com/apache/flink/pull/21764#issuecomment-1405322143

   
   ## CI report:
   
   * 09865792df8a4901f8429b063e09f5ee1b4a75e9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26082) CancelPartitionRequestTest.testDuplicateCancel failed on azure due to bind failed

2023-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-26082:
---
Labels: auto-deprioritized-major pull-request-available test-stability  
(was: auto-deprioritized-major test-stability)

> CancelPartitionRequestTest.testDuplicateCancel failed on azure due to bind 
> failed
> -
>
> Key: FLINK-26082
> URL: https://issues.apache.org/jira/browse/FLINK-26082
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.5, 1.16.0
>Reporter: Yun Gao
>Assignee: Anton Kalashnikov
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Feb 10 01:56:01 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 2.273 s <<< FAILURE! - in 
> org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest
> Feb 10 01:56:01 [ERROR] 
> testDuplicateCancel(org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest)
>   Time elapsed: 1.877 s  <<< ERROR!
> Feb 10 01:56:01 
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  bind(..) failed: Address already in use
> Feb 10 01:56:01 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31070=logs=f0ac5c25-1168-55a5-07ff-0e88223afed9=0dbaca5d-7c38-52e6-f4fe-2fb69ccb3ada=6768



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] akalash opened a new pull request, #21764: [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.

2023-01-26 Thread via GitHub


akalash opened a new pull request, #21764:
URL: https://github.com/apache/flink/pull/21764

   ## What is the purpose of the change
   
   This fix initializes test netty server in the loop in order to decrease 
probability of `Address already in use` problem.
   
   ## Brief change log
   
 - *Initializing test netty server and client in the loop to avoid the 
probability of `Address already in use` problem.*
   
   
   ## Verifying this change
   
   It is a test itself
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] mas-chen commented on pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo

2023-01-26 Thread via GitHub


mas-chen commented on PR #1:
URL: 
https://github.com/apache/flink-connector-kafka/pull/1#issuecomment-1405303999

   Thanks for your help @zentol @MartijnVisser!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21763: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.

2023-01-26 Thread via GitHub


flinkbot commented on PR #21763:
URL: https://github.com/apache/flink/pull/21763#issuecomment-1405300184

   
   ## CI report:
   
   * b29a8ee33deeca92104a8375be8b98a4bce7b524 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28596) Support writing arrays to postgres array columns in Flink SQL JDBC connector

2023-01-26 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681036#comment-17681036
 ] 

Martijn Visser commented on FLINK-28596:


[~bobbyrlg] If you have a PR, looking forward to it :)

> Support writing arrays to postgres array columns in Flink SQL JDBC connector
> 
>
> Key: FLINK-28596
> URL: https://issues.apache.org/jira/browse/FLINK-28596
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.15.0
>Reporter: Bobby Richard
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] akalash opened a new pull request, #21763: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.

2023-01-26 Thread via GitHub


akalash opened a new pull request, #21763:
URL: https://github.com/apache/flink/pull/21763

   ## What is the purpose of the change
   
   This change fixes the bug in a test which trigger the latch for the current 
test from the remaining thread of the previous test.
   
   
   ## Brief change log
   
 - *The static latches in TaskTest were replaced by latches from invokable 
objects in order to avoid accidental sharing latches between multiple tests*
   
   
   ## Verifying this change
   It is a test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure

2023-01-26 Thread Anton Kalashnikov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kalashnikov reassigned FLINK-15550:
-

Assignee: Anton Kalashnikov

> testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
> 
>
> Key: FLINK-15550
> URL: https://issues.apache.org/jira/browse/FLINK-15550
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3, 1.16.0
>Reporter: Yun Tang
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Instance: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug
> {code:java}
> java.lang.AssertionError: expected: but was:
>   at 
> org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525)
> {code}
> {code:java}
> expected: but was:
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] kristoffSC commented on pull request #21393: [FLINK-27246][TableSQL/Runtime][master] - Include WHILE blocks in split generated java methods

2023-01-26 Thread via GitHub


kristoffSC commented on PR #21393:
URL: https://github.com/apache/flink/pull/21393#issuecomment-1405227888

   Hi @tsreaper 
   I'm currently working on the refactoring to include your suggestions, it 
does look promising. I should have new version by the end of this week.
   
   I have a question though.
   
   Can we assume that variables/fields used in IF and WHILE conditions will be 
always class member variables? I'm looking at `DeclarationRewriter` and I 
wonder if we take such assumption or can IF/WHILE condition arguments can come 
from method parameters?
   
   If they can come from method parameters and if they are primitive values, 
then we should not rewrite such a method.
   I don't think that original IfStatementRewriter deal with that but maybe I 
don't see it.
   
   What do you think?
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30619) AdaptiveSchedulerTest.testStatusMetrics is not stable

2023-01-26 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-30619.

Fix Version/s: 1.17.0
   1.16.2
   Resolution: Fixed

master: 38d390114e93e9f202dc5ac7fa668dd255637139
1.16: 0f52f62b1891230308282e0b819062eb13eb5644

> AdaptiveSchedulerTest.testStatusMetrics is not stable
> -
>
> Key: FLINK-30619
> URL: https://issues.apache.org/jira/browse/FLINK-30619
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.2
>
>
> We experience a test instability in 
> {{AdaptiveSchedulerTest.testStatusMetrics}}.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44635=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8475



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30619) AdaptiveSchedulerTest.testStatusMetrics is not stable

2023-01-26 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-30619:
-
Issue Type: Technical Debt  (was: Bug)

> AdaptiveSchedulerTest.testStatusMetrics is not stable
> -
>
> Key: FLINK-30619
> URL: https://issues.apache.org/jira/browse/FLINK-30619
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.2
>
>
> We experience a test instability in 
> {{AdaptiveSchedulerTest.testStatusMetrics}}.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44635=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8475



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #21751: [FLINK-30619][tests] Add retry

2023-01-26 Thread via GitHub


zentol merged PR #21751:
URL: https://github.com/apache/flink/pull/21751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30052) Move existing Kafka connector code from Flink repo to dedicated Kafka repo

2023-01-26 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681024#comment-17681024
 ] 

Chesnay Schepler commented on FLINK-30052:
--

kafka-main:
979bdcfafabae392fa6978fb107c4228d68776b7
2fb6ad544c374c76c0e49c1ef11affbd82fe63d0

> Move existing Kafka connector code from Flink repo to dedicated Kafka repo
> --
>
> Key: FLINK-30052
> URL: https://issues.apache.org/jira/browse/FLINK-30052
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> Instructions guide can be found at 
> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] zentol merged pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo

2023-01-26 Thread via GitHub


zentol merged PR #1:
URL: https://github.com/apache/flink-connector-kafka/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] boring-cyborg[bot] commented on pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo

2023-01-26 Thread boring-cyborg


boring-cyborg[bot] commented on PR #1:
URL: 
https://github.com/apache/flink-connector-kafka/pull/1#issuecomment-1405200120

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #513: [FLINK-30781] Add completed checkpoint check to cluster health check

2023-01-26 Thread via GitHub


gaborgsomogyi commented on code in PR #513:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/513#discussion_r1087995977


##
docs/layouts/shortcodes/generated/dynamic_section.html:
##
@@ -8,6 +8,18 @@
 
 
 
+
+
kubernetes.operator.cluster.health-check.completed-checkpoints.enabled
+false
+Boolean
+Whether to enable completed checkpoint health check for 
clusters.
+
+
+
kubernetes.operator.cluster.health-check.completed-checkpoints.window

Review Comment:
   Changed + fixed other issues in the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] antonipp commented on pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


antonipp commented on PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#issuecomment-1405163365

   Thanks a lot @gyfora!
   
   Regarding:
   > We just have to ensure that we handle the Zookeeper dependency as optional
   
   The dependency was actually already there! I didn't have to add anything.
   
   I looked into it and all Curator classes are coming from the `flink-clients` 
package which is listed as a dependency in `pom.xml` 
[here](https://github.com/antonipp/flink-kubernetes-operator/blob/57b2832d97a4d52d7fcd814d50b94f4e745ce027/flink-kubernetes-operator/pom.xml#L75-L79).
   
   $ mvn dependency:tree -pl 'flink-kubernetes-operator'
   [...]
   [INFO] +- org.apache.flink:flink-clients:jar:1.15.3:compile
   [INFO] |  +- org.apache.flink:flink-runtime:jar:1.15.3:compile
   [INFO] |  |  +- org.apache.flink:flink-rpc-core:jar:1.15.3:compile
   [INFO] |  |  +- [...]
   [INFO] |  |  +- 
org.apache.flink:flink-shaded-zookeeper-3:jar:3.5.9-15.0:compile
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087957846


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SplitEnumerator} that splits Cassandra cluster into {@link 
CassandraSplit}s. */
+public final class CassandraSplitEnumerator
+implements SplitEnumerator {
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitEnumerator.class);
+private static final String MURMUR3PARTITIONER = 
"org.apache.cassandra.dht.Murmur3Partitioner";
+
+private final SplitEnumeratorContext enumeratorContext;
+private final CassandraEnumeratorState state;
+private final Cluster cluster;
+
+public CassandraSplitEnumerator(
+SplitEnumeratorContext enumeratorContext,
+CassandraEnumeratorState state,
+ClusterBuilder clusterBuilder) {
+this.enumeratorContext = enumeratorContext;
+this.state = state == null ? new CassandraEnumeratorState() : state /* 
snapshot restore*/;
+this.cluster = clusterBuilder.getCluster();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+assignUnprocessedSplitsToReader(subtaskId);
+}
+
+@Override
+public void start() {
+// discover the splits and update unprocessed splits and then assign 
them.
+// There is only an initial splits discovery, no periodic discovery.
+enumeratorContext.callAsync(
+this::discoverSplits,
+(splits, throwable) -> {
+LOG.info("Add {} splits to CassandraSplitEnumerator.", 
splits.size());
+state.addNewSplits(splits, 
enumeratorContext.currentParallelism());
+});
+}
+
+private List discoverSplits() {
+final int numberOfSplits = enumeratorContext.currentParallelism();
+final Metadata clusterMetadata = cluster.getMetadata();
+final String partitioner = clusterMetadata.getPartitioner();
+final SplitsGenerator splitsGenerator = new 
SplitsGenerator(partitioner);
+if (MURMUR3PARTITIONER.equals(partitioner)) {
+LOG.info("Murmur3Partitioner detected, splitting");
+List tokens =
+clusterMetadata.getTokenRanges().stream()
+.map(
+tokenRange ->
+new BigInteger(
+
tokenRange.getEnd().getValue().toString()))
+.collect(Collectors.toList());
+return splitsGenerator.generateSplits(numberOfSplits, tokens);
+} else {
+// Murmur3Partitioner is the default and recommended partitioner 
for Cassandra 1.2+
+// see
+// 
https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html
+LOG.warn(
+"The current Cassandra partitioner is {}, only 

[jira] [Commented] (FLINK-27273) Support both configMap and zookeeper based HA data clean up

2023-01-26 Thread Anton Ippolitov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681010#comment-17681010
 ] 

Anton Ippolitov commented on FLINK-27273:
-

Hi, FYI I just opened a draft PR for this issue 
([https://github.com/apache/flink-kubernetes-operator/pull/514)|https://github.com/apache/flink-kubernetes-operator/pull/514]
 and left some comments about the parts of code I wasn't 100% sure about.

Thanks a lot [~gyfora] for a super quick review as well 

> Support both configMap and zookeeper based HA data clean up
> ---
>
> Key: FLINK-27273
> URL: https://issues.apache.org/jira/browse/FLINK-27273
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Aitozi
>Assignee: Anton Ippolitov
>Priority: Major
>  Labels: pull-request-available
>
> As discussed in 
> [comments|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815695041]
> We only support clean up the ha data based configMap. Considering that 
> zookeeper is still widely used as ha service when deploy on the kubernetes, I 
> think we should still take it into account, otherwise, It will come up with 
> some unexpected behavior when play with zk ha jobs. For example, it will 
> recover old JobGraph when redeploy the application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087950644


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+
+/**
+ * Represents a portion of Cassandra token ring. It is a range between a start 
token and an end
+ * token.
+ */
+public final class RingRange implements Serializable {

Review Comment:
   class removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] antonipp commented on a diff in pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


antonipp commented on code in PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1087919678


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
 return kubernetesClient;
 }
 
+public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {

Review Comment:
   Wrapped ZK client creation in a separate method so that it's easier to mock 
once I get to unit tests



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##
@@ -340,6 +341,11 @@ protected Configuration build() {
 // Set cluster config
 effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, 
namespace);
 effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, 
clusterId);
+
+if (FlinkUtils.isZookeeperHAActivated(effectiveConfig)) {
+effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
clusterId);

Review Comment:
By default, `clusterId` is generated [based on the FlinkDeployment resource 
name](https://github.com/antonipp/flink-kubernetes-operator/blob/6b0d91fe2c1dc41eeba655337c1d99fedabe3ef8/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L96).
   
   The value of `HighAvailabilityOptions.HA_CLUSTER_ID` is later 
[used](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L226)
 as the namespace for storing Zookeeper ZNodes for each Flink application.
   
   This means that if someone creates two `FlinkDeployment`s with the same name 
in two different Kubernetes namespaces and these two `FlinkDeployment`s use the 
same ZK cluster for HA metadata storage, there might be a collision. So I think 
we should append a random UUID to `HighAvailabilityOptions.HA_CLUSTER_ID` :
   ```java
   effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId + 
"-RANDOM-UUID-HERE");
   ```
   Not 100% sure of all the implications of doing that but I think it should be 
safe enough?



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
 return kubernetesClient;
 }
 
+public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});

Review Comment:
   Note that by leveraging the 
[ZooKeeperUtils::startCuratorFramework](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L200)
 function and by taking the application `conf`, we create the client in exactly 
the same way as the Flink application itself. This means that we should be able 
to support all ZK configuration settings that are supported by Flink 
applications themselves (ex: ACLs).



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
 return kubernetesClient;
 }
 
+public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+}
+
 @Override
 public void submitApplicationCluster(
 JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
 LOG.info(
 "Deploying application cluster{}",
 requireHaMetadata ? " requiring last-state from HA metadata" : 
"");
+
+// If Kubernetes or Zookeeper HA are activated, delete the job graph 
in HA storage so that
+// the newly changed job config (e.g. parallelism) could take effect
 if (FlinkUtils.isKubernetesHAActivated(conf)) {
 final String clusterId = 
conf.get(KubernetesConfigOptions.CLUSTER_ID);
 final String namespace = 
conf.get(KubernetesConfigOptions.NAMESPACE);
-// Delete the job graph in the HA ConfigMaps so that the newly 
changed job config(e.g.
-// parallelism) could take effect
 FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, 
kubernetesClient);
+} else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+try (var curator = getCurator(conf)) {

Review Comment:
   Here, the ZK client is created only for one request and then torn down (in 
`FlinkUtils ::isZookeeperHaMetadataAvailable` it's the same thing). We could 
somehow cache these clients but I think it's not worth it. 
   
   First of 

[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087944134


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private final Cluster cluster;
+private final Session session;
+private final Set unprocessedSplits;
+private final AtomicBoolean wakeup = new AtomicBoolean(false);
+private final String query;
+
+public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+// need a thread safe set
+this.unprocessedSplits = ConcurrentHashMap.newKeySet();
+this.query = query;
+cluster = clusterBuilder.getCluster();
+session = cluster.connect();
+}
+
+@Override
+public RecordsWithSplitIds fetch() {
+Map> recordsBySplit = new HashMap<>();
+Set finishedSplits = new HashSet<>();
+Metadata clusterMetadata = cluster.getMetadata();
+
+String partitionKey = getPartitionKey(clusterMetadata);
+String finalQuery = generateRangeQuery(query, partitionKey);
+PreparedStatement preparedStatement = session.prepare(finalQuery);
+// Set wakeup to false to start consuming.
+wakeup.compareAndSet(true, false);
+for (CassandraSplitState cassandraSplitState : unprocessedSplits) {
+// allow to interrupt the reading of splits as requested in the API
+if (wakeup.get()) {
+break;
+}
+if (!cassandraSplitState.isEmpty()) {
+try {
+final Set ringRanges =
+cassandraSplitState.getUnprocessedRingRanges();
+final String cassandraSplitId = 
cassandraSplitState.getSplitId();
+
+for (RingRange ringRange : ringRanges) {
+Token startToken =
+
clusterMetadata.newToken(ringRange.getStart().toString());
+

[GitHub] [flink-connector-cassandra] echauchot commented on pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1405116976

   @zentol I'm almost done replacing the whole split mechanism. I could even 
remove the size estimates as Flink does not rely on expected split memory size 
as Beam. So I could use a very simple splitting mechanism based only on 
Cassandra min and max tokens per partitioner (min,x][x, max) for 2 splits for 
example. I also could consider that a Cassandra split processes only a single 
ring range and get rid of RingRange class. It'll change much. I also need to do 
a final pass before it is ready for another round. Otherwise you'll see sketchy 
things. don't waste your time reviewing now. I'll ping you to take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087939852


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SplitEnumerator} that splits Cassandra cluster into {@link 
CassandraSplit}s. */
+public final class CassandraSplitEnumerator
+implements SplitEnumerator {
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitEnumerator.class);
+private static final String MURMUR3PARTITIONER = 
"org.apache.cassandra.dht.Murmur3Partitioner";
+
+private final SplitEnumeratorContext enumeratorContext;
+private final CassandraEnumeratorState state;
+private final Cluster cluster;
+
+public CassandraSplitEnumerator(
+SplitEnumeratorContext enumeratorContext,
+CassandraEnumeratorState state,
+ClusterBuilder clusterBuilder) {
+this.enumeratorContext = enumeratorContext;
+this.state = state == null ? new CassandraEnumeratorState() : state /* 
snapshot restore*/;
+this.cluster = clusterBuilder.getCluster();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+assignUnprocessedSplitsToReader(subtaskId);
+}
+
+@Override
+public void start() {
+// discover the splits and update unprocessed splits and then assign 
them.
+// There is only an initial splits discovery, no periodic discovery.
+enumeratorContext.callAsync(
+this::discoverSplits,
+(splits, throwable) -> {
+LOG.info("Add {} splits to CassandraSplitEnumerator.", 
splits.size());
+state.addNewSplits(splits, 
enumeratorContext.currentParallelism());
+});
+}
+
+private List discoverSplits() {
+final int numberOfSplits = enumeratorContext.currentParallelism();
+final Metadata clusterMetadata = cluster.getMetadata();
+final String partitioner = clusterMetadata.getPartitioner();
+final SplitsGenerator splitsGenerator = new 
SplitsGenerator(partitioner);
+if (MURMUR3PARTITIONER.equals(partitioner)) {
+LOG.info("Murmur3Partitioner detected, splitting");
+List tokens =
+clusterMetadata.getTokenRanges().stream()
+.map(
+tokenRange ->
+new BigInteger(
+
tokenRange.getEnd().getValue().toString()))
+.collect(Collectors.toList());
+return splitsGenerator.generateSplits(numberOfSplits, tokens);
+} else {
+// Murmur3Partitioner is the default and recommended partitioner 
for Cassandra 1.2+
+// see
+// 
https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html
+LOG.warn(
+"The current Cassandra partitioner is {}, only 

[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private final Cluster cluster;
+private final Session session;
+private final Set unprocessedSplits;
+private final AtomicBoolean wakeup = new AtomicBoolean(false);
+private final String query;
+
+public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+// need a thread safe set

Review Comment:
   I've pushed an impl that assigns only a single split to each reader and this 
removed the need for concurrent set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087928810


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/RingRange.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+
+/**
+ * Represents a portion of Cassandra token ring. It is a range between a start 
token and an end
+ * token.
+ */
+public final class RingRange implements Serializable {

Review Comment:
   No longer needs to be serializable; also applies to other classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


gyfora commented on PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#issuecomment-1405091162

   I think from an initial review this looks pretty good. We just have to 
ensure that we handle the Zookeeper dependency as optional as this is the less 
frequent integration :) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087926802


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
+import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReader;
+import org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bounded source to read from Cassandra and return a collection of entities 
as {@code
+ * DataStream}. An entity is built by Cassandra mapper ({@code
+ * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
+ * in https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/;>
+ * Cassandra object mapper).
+ *
+ * To use it, do the following:
+ *
+ * {@code
+ * ClusterBuilder clusterBuilder = new ClusterBuilder() {
+ *   @Override
+ *   protected Cluster buildCluster(Cluster.Builder builder) {
+ * return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
+ *   .withSocketOptions(new SocketOptions()
+ *   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
+ *   .setReadTimeoutMillis(READ_TIMEOUT))
+ *   .build();
+ *   }
+ * };
+ * Source cassandraSource = new CassandraSource(clusterBuilder,
+ *  Pojo.class,
+ *  "select ... from 
KEYSPACE.TABLE ...;",
+ *  () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
+ *
+ * DataStream stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),
+ * "CassandraSource");
+ * }
+ */
+@PublicEvolving
+public class CassandraSource
+implements Source, 
ResultTypeQueryable {
+
+public static final String CQL_PROHIBITTED_CLAUSES_REGEXP =
+"(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*";
+private static final long serialVersionUID = 7773196541275567433L;
+
+private final ClusterBuilder clusterBuilder;
+private final Class pojoClass;
+private final String query;
+private final MapperOptions mapperOptions;
+
+public CassandraSource(
+ClusterBuilder clusterBuilder,
+Class pojoClass,
+String query,
+MapperOptions mapperOptions) {
+checkNotNull(clusterBuilder, "ClusterBuilder required but 

[jira] [Assigned] (FLINK-27273) Support both configMap and zookeeper based HA data clean up

2023-01-26 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-27273:
--

Assignee: Anton Ippolitov

> Support both configMap and zookeeper based HA data clean up
> ---
>
> Key: FLINK-27273
> URL: https://issues.apache.org/jira/browse/FLINK-27273
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Aitozi
>Assignee: Anton Ippolitov
>Priority: Major
>  Labels: pull-request-available
>
> As discussed in 
> [comments|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815695041]
> We only support clean up the ha data based configMap. Considering that 
> zookeeper is still widely used as ha service when deploy on the kubernetes, I 
> think we should still take it into account, otherwise, It will come up with 
> some unexpected behavior when play with zk ha jobs. For example, it will 
> recover old JobGraph when redeploy the application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27273) Support both configMap and zookeeper based HA data clean up

2023-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27273:
---
Labels: pull-request-available  (was: )

> Support both configMap and zookeeper based HA data clean up
> ---
>
> Key: FLINK-27273
> URL: https://issues.apache.org/jira/browse/FLINK-27273
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> As discussed in 
> [comments|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815695041]
> We only support clean up the ha data based configMap. Considering that 
> zookeeper is still widely used as ha service when deploy on the kubernetes, I 
> think we should still take it into account, otherwise, It will come up with 
> some unexpected behavior when play with zk ha jobs. For example, it will 
> recover old JobGraph when redeploy the application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] antonipp opened a new pull request, #514: [WIP][FLINK-27273] Zookeeper HA support

2023-01-26 Thread via GitHub


antonipp opened a new pull request, #514:
URL: https://github.com/apache/flink-kubernetes-operator/pull/514

   > ⚠️ This PR is WIP, my goal is to get feedback as early as possible
   
   ## What is the purpose of the change
   https://issues.apache.org/jira/browse/FLINK-27273
   This PR makes sure that Flink applications which use Zookeeper-based HA can 
be deployed via the Kubernetes Operator.
   It wasn't previously possible mainly due to the lack of proper clean-up of 
Zookeeper data which could cause unexpected behaviour in Flink applications, as 
mentioned in the ticket.
   
   ## Brief change log
   - JobGraph-related data is now properly cleaned in Zookeeper (same behaviour 
as when using Kubernetes HA)
   - Code has been updated to support both Kubernetes and Zookeper-based HA 
settings. All features which worked with Kubernetes HA (such as JM deployment 
recovery, unhealthy job restarts or rollbacks) should also work with Zookeeper 
HA as well.
   
   ## Verifying this change
   - **TODO: add unit tests!**
   - Tried some scenarios in our Kubernetes environment:
 - `upgradeMode: savepoint`:
   - [x] Successfully deployed a Flink application with `upgradeMode: 
savepoint` and Zookeeper HA enabled
   - [x] Updated the `FlinkDeployment` object, verified that the JobGraph 
data was successfully deleted in Zookeeper and that the new version of the 
application was successfully rolled out
   - [x] Manually deleted the JobManager Kubernetes Deployment, verified 
that the Operator was able to recover it (with 
`kubernetes.operator.jm-deployment-recovery.enabled` and 
`kubernetes.operator.job.upgrade.last-state-fallback.enabled` set to `true`) 
and that the application restarted successfully (this process relies on HA 
metadata too)
 - `upgradeMode: last-state`:
   - [x] Successfully deployed a Flink application with `upgradeMode: 
last-state` and Zookeeper HA enabled
   - [x] Updated the `FlinkDeployment` object, verified that the JobGraph 
data was successfully deleted in Zookeeper and that the new version of the 
application was successfully rolled out
   - [x] Manually deleted the JobManager Kubernetes Deployment, verified 
that the Operator was able to recover it with 
`kubernetes.operator.jm-deployment-recovery.enabled` set to `true` and that the 
application restarted successfully (this process relies on HA metadata too)
 - [x] Set `kubernetes.operator.cluster.health-check.enabled` to `true` and 
deployed a Flink application which was continuously failing and restarting. 
Once the restart threshold 
(`kubernetes.operator.cluster.health-check.restarts.threshold`) was hit, the 
Operator was able to successfully validate that the Zookeeper HA metadata 
exists and restarted the job  
 - [x] Set `kubernetes.operator.deployment.rollback.enabled` to `true` and 
deployed a Flink application which was in `CrashLoopBackOff`. Once the 
`kubernetes.operator.deployment.readiness.timeout` passed, the Operator was 
able to successfully validate that the Zookeeper HA metadata exists and rolled 
back the job to the previous version. 
 - **TODO: anything else?**
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: yes 
   
   ## Documentation
 - Does this pull request introduce a new feature?: yes
 - If yes, how is the feature documented?: Updated the documentation where 
applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21762: [FLINK-30796][metrics][slf4j] (Partially) skip metrics report

2023-01-26 Thread via GitHub


flinkbot commented on PR #21762:
URL: https://github.com/apache/flink/pull/21762#issuecomment-1405063430

   
   ## CI report:
   
   * 476e8e6495fe8930c7bde3ad1834d79260a23580 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30796) Make Slf4jReporter less noisy when no/few metrics exist

2023-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30796:
---
Labels: pull-request-available  (was: )

> Make Slf4jReporter less noisy when no/few metrics exist
> ---
>
> Key: FLINK-30796
> URL: https://issues.apache.org/jira/browse/FLINK-30796
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol opened a new pull request, #21762: [FLINK-30796][metrics][slf4j] (Partially) skip metrics report

2023-01-26 Thread via GitHub


zentol opened a new pull request, #21762:
URL: https://github.com/apache/flink/pull/21762

   The e2e logs are more noisy then they need to be because the slf4j reporter 
logs a lot of lines even if no metrics (of a particular type) are registered.
   
   Full skip:
   Before:
   ```
   === Starting metrics report 
===
   
   -- Counters 
---
   
   -- Gauges 
-
   
   -- Meters 
-
   
   -- Histograms 
-
   
   === Finished metrics report 
===
   ```
   After:
   ```
   Skipping metrics report because no metrics are registered.
   ```
   
   
   Partial skip:
   Before:
   ```
   === Starting metrics report 
===
   
   -- Counters 
---
   scope.simpleCounter: 0
   
   -- Gauges 
-
   
   -- Meters 
-
   
   -- Histograms 
-
   
   === Finished metrics report 
===
   ```
   After:
   ```
   === Starting metrics report 
===
   
   -- Counters 
-
   scope.simpleCounter: 0
   
   === Finished metrics report 
===
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TJX2014 commented on pull request #21761: [FLINK-30795][tests] StreamingWithStateTestBase of win not correct

2023-01-26 Thread via GitHub


TJX2014 commented on PR #21761:
URL: https://github.com/apache/flink/pull/21761#issuecomment-1405040674

   Hi @AlanConfluent , could you please help me review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21761: [FLINK-30795][tests] StreamingWithStateTestBase of win not correct

2023-01-26 Thread via GitHub


flinkbot commented on PR #21761:
URL: https://github.com/apache/flink/pull/21761#issuecomment-1405027565

   
   ## CI report:
   
   * 6f02da7bce96a3bbd67c4ee868273298fb72f73a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #513: [FLINK-30781] Add completed checkpoint check to cluster health check

2023-01-26 Thread via GitHub


gyfora commented on code in PR #513:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/513#discussion_r1087871390


##
docs/layouts/shortcodes/generated/dynamic_section.html:
##
@@ -8,6 +8,18 @@
 
 
 
+
+
kubernetes.operator.cluster.health-check.completed-checkpoints.enabled
+false
+Boolean
+Whether to enable completed checkpoint health check for 
clusters.
+
+
+
kubernetes.operator.cluster.health-check.completed-checkpoints.window

Review Comment:
   maybe it would be better to call it `checkpoint-progress`, and make it more 
explicit in the description that we record unhealthy state if no checkpoints 
were taken at all in the check window



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30795) StreamingWithStateTestBase of win not correct

2023-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30795:
---
Labels: pull-request-available  (was: )

> StreamingWithStateTestBase of win not correct
> -
>
> Key: FLINK-30795
> URL: https://issues.apache.org/jira/browse/FLINK-30795
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Windows path such as
> "
> file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439
> "
> will throw 
>  
> new IllegalArgumentException("Cannot use the root directory for 
> checkpoints.");
>Reporter: JinxinTang
>Priority: Major
>  Labels: pull-request-available
>
> Windows path such as
> "
> [file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439|file:///C:/UsersxxAppDataLocalTempjunit373749850266957074junit7014045318909690439]
> "
> will throw 
>  
> new IllegalArgumentException("Cannot use the root directory for 
> checkpoints.");



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TJX2014 opened a new pull request, #21761: [FLINK-30795][tests] StreamingWithStateTestBase of win not correct

2023-01-26 Thread via GitHub


TJX2014 opened a new pull request, #21761:
URL: https://github.com/apache/flink/pull/21761

   ## What is the purpose of the change
   Fix test case broken in win os.
   
   
   ## Brief change log
   Change "file://" to LocalFileSystem.getLocalFsURI.toString.
   
   
   ## Verifying this change
   
   org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase case 
work correctly in win os.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30796) Make Slf4jReporter less noisy when no/few metrics exist

2023-01-26 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-30796:


 Summary: Make Slf4jReporter less noisy when no/few metrics exist
 Key: FLINK-30796
 URL: https://issues.apache.org/jira/browse/FLINK-30796
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #509: [FLINK-30653] Trigger resource Events on autoscaler actions

2023-01-26 Thread via GitHub


gyfora commented on code in PR #509:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/509#discussion_r1087868130


##
examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java:
##
@@ -18,23 +18,38 @@
 
 package autoscaling;
 
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
 /** Autoscaling Example. */
 public class AutoscalingExample {
 public static void main(String[] args) throws Exception {
 var env = StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream stream = env.fromSequence(Long.MIN_VALUE, 
Long.MAX_VALUE);
+long numIterations = Long.parseLong(args[0]);
+DataStream stream =
+env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).filter(i -> 
System.nanoTime() > 1);
 stream =
-stream.shuffle()
+stream.keyBy(s -> "s")

Review Comment:
   I think it may be better to keep to `shuffle()` here, I was just testing 
some skew scenarios and forgot this in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #21697: [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop

2023-01-26 Thread via GitHub


1996fanrui commented on PR #21697:
URL: https://github.com/apache/flink/pull/21697#issuecomment-1405010319

   > Thanks for the improvement. Production code LGTM. Could you add a unit 
test for the `AbstractStreamTaskNetworkInput` or `StreamTaskNetworkInput` (in 
`StreamTaskNetworkInputTest.java`?), that will check if the records are batched 
and that batches can be interrupted?
   
   Thanks for your review, I have updated the unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21760: fix scala code

2023-01-26 Thread via GitHub


flinkbot commented on PR #21760:
URL: https://github.com/apache/flink/pull/21760#issuecomment-1405009024

   
   ## CI report:
   
   * 03e4e171a576de743ae8ccec2975235327fb4622 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] novakov-alexey opened a new pull request, #21760: fix scala code

2023-01-26 Thread via GitHub


novakov-alexey opened a new pull request, #21760:
URL: https://github.com/apache/flink/pull/21760

   
   
   ## What is the purpose of the change
   
   documentation fix.
   
   
   ## Brief change log
   
   Add missing val keyword
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no 
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] dawidwys commented on pull request #603: Flink 1.16.1

2023-01-26 Thread via GitHub


dawidwys commented on PR #603:
URL: https://github.com/apache/flink-web/pull/603#issuecomment-1405004500

   @XComp 
   
   > I'm not sure about the test instabilities. Do we include them? I remember 
some document where it stated that we should exclude them. But I couldn't find 
any documentation, anymore.
   
   I believe we should exclude them, as the the first paragraph of the blogpost 
says  
   
   > Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30795) StreamingWithStateTestBase of win not correct

2023-01-26 Thread JinxinTang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JinxinTang updated FLINK-30795:
---
Description: 
Windows path such as

"

[file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439|file:///C:/UsersxxAppDataLocalTempjunit373749850266957074junit7014045318909690439]

"

will throw 

 
new IllegalArgumentException("Cannot use the root directory for checkpoints.");

> StreamingWithStateTestBase of win not correct
> -
>
> Key: FLINK-30795
> URL: https://issues.apache.org/jira/browse/FLINK-30795
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Windows path such as
> "
> file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439
> "
> will throw 
>  
> new IllegalArgumentException("Cannot use the root directory for 
> checkpoints.");
>Reporter: JinxinTang
>Priority: Major
>
> Windows path such as
> "
> [file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439|file:///C:/UsersxxAppDataLocalTempjunit373749850266957074junit7014045318909690439]
> "
> will throw 
>  
> new IllegalArgumentException("Cannot use the root directory for 
> checkpoints.");



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] dawidwys commented on a diff in pull request #603: Flink 1.16.1

2023-01-26 Thread via GitHub


dawidwys commented on code in PR #603:
URL: https://github.com/apache/flink-web/pull/603#discussion_r1087851914


##
_posts/2023-01-26-release-1.16.1.md:
##
@@ -0,0 +1,258 @@
+---
+layout: post
+title:  "Apache Flink 1.16.1 Release Announcement"
+date: 2023-01-23T22:00:00.000Z
+categories: news
+authors:
+- MartijnVisser:
+  name: "Martijn Visser"
+  twitter: "martijnvisser82"
+
+excerpt: The Apache Flink Community is pleased to announce a bug fix release 
for Flink 1.16.
+
+---
+
+The Apache Flink Community is pleased to announce the first bug fix release of 
the Flink 1.16 series.
+
+This release includes 84 bug fixes, vulnerability fixes, and minor 
improvements for Flink 1.16.
+Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:
+[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352344).
+
+We highly recommend all users upgrade to Flink 1.16.1.
+
+# Release Artifacts
+
+## Maven Dependencies
+
+```xml
+
+  org.apache.flink
+  flink-java
+  1.16.1
+
+
+  org.apache.flink
+  flink-streaming-java
+  1.16.1
+
+
+  org.apache.flink
+  flink-clients
+  1.16.1
+
+```
+
+## Binaries
+
+You can find the binaries on the updated [Downloads page]({{ site.baseurl 
}}/downloads.html).
+
+## Docker Images
+
+* [library/flink](https://hub.docker.com/_/flink?tab=tags=1=1.16.1) 
(official images)
+* 
[apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.16.1) 
(ASF repository)
+
+## PyPi
+
+* [apache-flink==1.16.1](https://pypi.org/project/apache-flink/1.16.1/)
+
+# Upgrade Notes
+
+[FLINK-28988 - Incorrect result for filter after temporal 
join](https://issues.apache.org/jira/browse/FLINK-28988)
+The filter will not be pushed down into both inputs of the event time temporal 
join.
+This may cause incompatible plan changes compared to Flink 1.16.0, e.g., when 
the left input is an upsert source 
+(like upsert-kafka connector), the query plan will remove the 
ChangelogNormalize node in Flink 1.16.1, while it
+did appear in 1.16.0.
+
+[FLINK-29849 - Event time temporal join on an upsert source may produce 
incorrect execution plan](https://issues.apache.org/jira/browse/FLINK-29849)
+This resolves the correctness issue when do event time temporal join with a 
versioned table backed by an upsert source. 

Review Comment:
   ```suggestion
   This resolves the correctness issue when doing an event time temporal join 
with a versioned table backed by an upsert source. 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30795) StreamingWithStateTestBase of win not correct

2023-01-26 Thread JinxinTang (Jira)
JinxinTang created FLINK-30795:
--

 Summary: StreamingWithStateTestBase of win not correct
 Key: FLINK-30795
 URL: https://issues.apache.org/jira/browse/FLINK-30795
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows path such as

"

file://C:\Users\xx\AppData\Local\Temp\junit373749850266957074\junit7014045318909690439

"

will throw 

 
new IllegalArgumentException("Cannot use the root directory for checkpoints.");
Reporter: JinxinTang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087835183


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java:
##
@@ -115,29 +109,22 @@ private List discoverSplits() {
 @Override
 public void addSplitsBack(List splits, int subtaskId) {
 LOG.info("Add {} splits back to CassandraSplitEnumerator.", 
splits.size());
-state.addNewSplits(splits, enumeratorContext.currentParallelism());
-assignUnprocessedSplitsToReader(subtaskId);
+state.addNewSplits(splits);
 }
 
 @Override
 public void addReader(int subtaskId) {
 LOG.info("Adding reader {} to CassandraSplitEnumerator.", subtaskId);
-assignUnprocessedSplitsToReader(subtaskId);
+assignUnprocessedSplitToReader(subtaskId);
 }
 
-private void assignUnprocessedSplitsToReader(int readerId) {
+private void assignUnprocessedSplitToReader(int readerId) {
 checkReaderRegistered(readerId);
-
-final Set splitsForReader = 
state.getSplitsForReader(readerId);
-if (splitsForReader != null && !splitsForReader.isEmpty()) {
-Map> assignment = new HashMap<>();
-assignment.put(readerId, Lists.newArrayList(splitsForReader));
-LOG.info("Assigning splits to reader {}", assignment);
-enumeratorContext.assignSplits(new SplitsAssignment<>(assignment));
-}
-
-// periodically partition discovery is disabled, signal 
NoMoreSplitsEvent to the reader
-LOG.debug(
+final CassandraSplit cassandraSplit = state.getASplit();

Review Comment:
   Doesn't this have to handle the case where the split is `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087821967


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java:
##
@@ -48,6 +52,30 @@ public CassandraSplitState toSplitState() {
 return new CassandraSplitState(new HashSet<>(ringRanges), splitId());
 }
 
+public void serialize(ObjectOutputStream objectOutputStream) throws 
IOException {

Review Comment:
   let's make this package private. Personally I'd move all of it into the 
serializer, but I can see why it could be useful to have it here.
   For symmetry I'd prefer this method to be static and accept a CassandraSplit 
instance.



##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java:
##
@@ -48,6 +52,30 @@ public CassandraSplitState toSplitState() {
 return new CassandraSplitState(new HashSet<>(ringRanges), splitId());
 }
 
+public void serialize(ObjectOutputStream objectOutputStream) throws 
IOException {
+objectOutputStream.writeInt(ringRanges.size());
+for (RingRange ringRange : ringRanges) {
+objectOutputStream.writeObject(ringRange.getStart());
+objectOutputStream.writeObject(ringRange.getEnd());
+}
+}
+
+public static CassandraSplit deserialize(ObjectInputStream 
objectInputStream)
+throws IOException {
+try {
+final int nbRingRanges = objectInputStream.readInt();
+Set ringRanges = new HashSet<>(nbRingRanges);

Review Comment:
   note: Passing npRingRanges won't have the desired effect of avoiding 
allocations, since the backing map is resized when the map is filled to a 
certain load. In fact you've guaranteed that the map will be resized at least 
once.



##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private 

[GitHub] [flink] XComp commented on pull request #21756: [FLINK-30787][debug] Adds dmesg call to test_controller to check whether it would succeed on Alibaba instances.

2023-01-26 Thread via GitHub


XComp commented on PR #21756:
URL: https://github.com/apache/flink/pull/21756#issuecomment-1404966145

   I wanted to verify it because of [Robert's 
comment](https://issues.apache.org/jira/browse/FLINK-30787?focusedCommentId=17680583=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17680583).
 He sounded confident that it worked at some point in time. But you're right; 
it looks like the azure pipeline change didn't cause that. I actually verified 
the order of the commits, beforehand, but must have mixed up the commit order.
   
   So, I see two options here:
   1. we remove this part of the code again because nobody complained in the 
passed.
   2. We enable kernel log access in the docker container as explained in [this 
StackExchange 
thread](https://unix.stackexchange.com/questions/390184/dmesg-read-kernel-buffer-failed-permission-denied)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #21756: [FLINK-30787][debug] Adds dmesg call to test_controller to check whether it would succeed on Alibaba instances.

2023-01-26 Thread via GitHub


zentol commented on PR #21756:
URL: https://github.com/apache/flink/pull/21756#issuecomment-1404950896

   I mean, it also fails in your personal azure, and the whole dmesg business 
was added _after_ FLINK-13978.
   
   Chances are this just fails because the tests are run in docker :shrug: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21751: [FLINK-30619][tests] Add retry

2023-01-26 Thread via GitHub


XComp commented on code in PR #21751:
URL: https://github.com/apache/flink/pull/21751#discussion_r1087810726


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -585,7 +586,7 @@ public void testStatusMetrics() throws Exception {
 // sleep a bit to ensure uptime is > 0
 Thread.sleep(10L);
 
-assertThat(upTimeGauge.getValue()).isGreaterThan(0L);
+CommonTestUtils.waitUntilCondition(() -> upTimeGauge.getValue() > 0L);

Review Comment:
   The `Thread.sleep(10L)` can be removed if we add the waitUntil condition. 
Additionally, the same pattern with `Thread.sleep` and `isGreatherThan` is 
applied a few lines further down in the test code. We might want to update that 
as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #21751: [FLINK-30619][tests] Add retry

2023-01-26 Thread via GitHub


zentol commented on PR #21751:
URL: https://github.com/apache/flink/pull/21751#issuecomment-1404938724

   I doubt that a thread dump is gonna help us much since we know that the job 
state was changed from the existing logs.
   But in the end waiting for it is a de-facto retry so why not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #509: [FLINK-30653] Trigger resource Events on autoscaler actions

2023-01-26 Thread via GitHub


mxm commented on code in PR #509:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/509#discussion_r1087710879


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##
@@ -99,14 +103,27 @@ public boolean scaleResource(
 return false;
 }
 
+var scalingReport = scalingReport(scalingSummaries);
+eventRecorder.triggerEvent(
+resource,
+EventRecorder.Type.Normal,
+EventRecorder.Reason.ScalingReport,
+EventRecorder.Component.Operator,
+scalingReport);
+
+if (!conf.get(SCALING_ENABLED)) {
+return false;
+}

Review Comment:
   That is awesome! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30717) Migrate Travis CI to Github Actions

2023-01-26 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678635#comment-17678635
 ] 

Chesnay Schepler edited comment on FLINK-30717 at 1/26/23 10:59 AM:


benchmarks: 9fa36c41b055b807ff7fc25cc548ff04f9ace28d
trainings: 188f865c62599ad92d538e5fbf7b1dc9a94d8f41


was (Author: zentol):
benchmarks: 9fa36c41b055b807ff7fc25cc548ff04f9ace28d

> Migrate Travis CI to Github Actions
> ---
>
> Key: FLINK-30717
> URL: https://issues.apache.org/jira/browse/FLINK-30717
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Benchmarks, Documentation / Training
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> Infra decided to no longer support Travis in the near future so we need to 
> migrate some repos.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30717) Migrate Travis CI to Github Actions

2023-01-26 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-30717.

Resolution: Fixed

> Migrate Travis CI to Github Actions
> ---
>
> Key: FLINK-30717
> URL: https://issues.apache.org/jira/browse/FLINK-30717
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Benchmarks, Documentation / Training
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> Infra decided to no longer support Travis in the near future so we need to 
> migrate some repos.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-training] zentol merged pull request #51: [FLINK-30717][ci] Migrate to GitHub Actions

2023-01-26 Thread via GitHub


zentol merged PR #51:
URL: https://github.com/apache/flink-training/pull/51


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1404820642

   > > I don't think it is an ASF rule but fair enough,
   > 
   > FYI; By and large it should be viewed as a legal requirement. By copying 
code from cassandra you have to adhere to their licensing, which among other 
state that you must have prominent notices for changes to a file.
   
   You're right cf ASF v2 4.d:
   `If the Work includes a "NOTICE" text file as part of its distribution, then 
any Derivative Works that You distribute must include a readable copy of the 
attribution notices contained within such NOTICE file, excluding those notices 
that do not pertain to any part of the Derivative Works, in at least one of the 
following places: within a NOTICE text file distributed as part of the 
Derivative Works; within the Source form or documentation, if provided along 
with the Derivative Works; or, within a display generated by the Derivative 
Works, if and wherever such third-party notices normally appear. The contents 
of the NOTICE file are for informational purposes only and do not modify the 
License. You may add Your own attribution notices within Derivative Works that 
You distribute, alongside or as an addendum to the NOTICE text from the Work, 
provided that such additional attribution notices cannot be construed as 
modifying the License.`
   
   Anyway, as we decided to replace the split code with something that was 
never merged to Beam, there is no need anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30754) Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues

2023-01-26 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-30754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-30754.
--
Fix Version/s: 1.17
   Resolution: Fixed

3a64648 in master

> Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test 
> issues
> 
>
> Key: FLINK-30754
> URL: https://issues.apache.org/jira/browse/FLINK-30754
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.17.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] mbalassi merged pull request #21732: [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues

2023-01-26 Thread via GitHub


mbalassi merged PR #21732:
URL: https://github.com/apache/flink/pull/21732


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private final Cluster cluster;
+private final Session session;
+private final Set unprocessedSplits;
+private final AtomicBoolean wakeup = new AtomicBoolean(false);
+private final String query;
+
+public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+// need a thread safe set

Review Comment:
   I've pushed an impl that assigns only a single split to each reader ans this 
removed the need for cucurrent set



##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 

[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private final Cluster cluster;
+private final Session session;
+private final Set unprocessedSplits;
+private final AtomicBoolean wakeup = new AtomicBoolean(false);
+private final String query;
+
+public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+// need a thread safe set

Review Comment:
   I'll push an impl that assigns only a single split to each reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087607875


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private final Cluster cluster;
+private final Session session;
+private final Set unprocessedSplits;
+private final AtomicBoolean wakeup = new AtomicBoolean(false);
+private final String query;
+
+public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+// need a thread safe set

Review Comment:
   I pushed an impl that assigns only a single split to each reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-26 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1087605427


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SplitEnumerator} that splits Cassandra cluster into {@link 
CassandraSplit}s. */
+public final class CassandraSplitEnumerator
+implements SplitEnumerator {
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitEnumerator.class);
+private static final String MURMUR3PARTITIONER = 
"org.apache.cassandra.dht.Murmur3Partitioner";
+
+private final SplitEnumeratorContext enumeratorContext;
+private final CassandraEnumeratorState state;
+private final Cluster cluster;
+
+public CassandraSplitEnumerator(
+SplitEnumeratorContext enumeratorContext,
+CassandraEnumeratorState state,
+ClusterBuilder clusterBuilder) {
+this.enumeratorContext = enumeratorContext;
+this.state = state == null ? new CassandraEnumeratorState() : state /* 
snapshot restore*/;
+this.cluster = clusterBuilder.getCluster();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+assignUnprocessedSplitsToReader(subtaskId);
+}
+
+@Override
+public void start() {
+// discover the splits and update unprocessed splits and then assign 
them.
+// There is only an initial splits discovery, no periodic discovery.
+enumeratorContext.callAsync(
+this::discoverSplits,
+(splits, throwable) -> {
+LOG.info("Add {} splits to CassandraSplitEnumerator.", 
splits.size());
+state.addNewSplits(splits, 
enumeratorContext.currentParallelism());
+});
+}
+
+private List discoverSplits() {
+final int numberOfSplits = enumeratorContext.currentParallelism();
+final Metadata clusterMetadata = cluster.getMetadata();
+final String partitioner = clusterMetadata.getPartitioner();
+final SplitsGenerator splitsGenerator = new 
SplitsGenerator(partitioner);
+if (MURMUR3PARTITIONER.equals(partitioner)) {
+LOG.info("Murmur3Partitioner detected, splitting");
+List tokens =
+clusterMetadata.getTokenRanges().stream()
+.map(
+tokenRange ->
+new BigInteger(
+
tokenRange.getEnd().getValue().toString()))
+.collect(Collectors.toList());
+return splitsGenerator.generateSplits(numberOfSplits, tokens);
+} else {
+// Murmur3Partitioner is the default and recommended partitioner 
for Cassandra 1.2+
+// see
+// 
https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archPartitionerAbout.html
+LOG.warn(
+"The current Cassandra partitioner is {}, only 

[jira] [Created] (FLINK-30794) Disable dependency convergence check for night connector builds

2023-01-26 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30794:
--

 Summary: Disable dependency convergence check for night connector 
builds
 Key: FLINK-30794
 URL: https://issues.apache.org/jira/browse/FLINK-30794
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Common
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21759: [FLINK-29237][table] Remove RexSimplify from Flink code, SearchOperator code generation for RexUnknown.nullAs

2023-01-26 Thread via GitHub


flinkbot commented on PR #21759:
URL: https://github.com/apache/flink/pull/21759#issuecomment-1404704386

   
   ## CI report:
   
   * 8d766c9246e2178ebc237d93171e918c74e38050 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29237) RexSimplify can not be removed after update to calcite 1.27

2023-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29237:
---
Labels: pull-request-available  (was: )

> RexSimplify can not be removed after update to calcite 1.27
> ---
>
> Key: FLINK-29237
> URL: https://issues.apache.org/jira/browse/FLINK-29237
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> It seems there is some work should be done to make it happen
> Currently removal of RexSimplify from Flink repo leads to failure of several 
> tests like
> {{IntervalJoinTest#testFallbackToRegularJoin}}
> {{CalcITCase#testOrWithIsNullInIf}}
> {{CalcITCase#testOrWithIsNullPredicate}}
> example of failure
> {noformat}
> Sep 07 11:25:08 java.lang.AssertionError: 
> Sep 07 11:25:08 
> Sep 07 11:25:08 Results do not match for query:
> Sep 07 11:25:08   
> Sep 07 11:25:08 SELECT * FROM NullTable3 AS T
> Sep 07 11:25:08 WHERE T.a = 1 OR T.a = 3 OR T.a IS NULL
> Sep 07 11:25:08 
> Sep 07 11:25:08 
> Sep 07 11:25:08 Results
> Sep 07 11:25:08  == Correct Result - 4 ==   == Actual Result - 2 ==
> Sep 07 11:25:08  +I[1, 1, Hi]   +I[1, 1, Hi]
> Sep 07 11:25:08  +I[3, 2, Hello world]  +I[3, 2, Hello world]
> Sep 07 11:25:08 !+I[null, 999, NullTuple]   
> Sep 07 11:25:08 !+I[null, 999, NullTuple]   
> Sep 07 11:25:08 
> Sep 07 11:25:08 Plan:
> Sep 07 11:25:08   == Abstract Syntax Tree ==
> Sep 07 11:25:08 LogicalProject(inputs=[0..2])
> Sep 07 11:25:08 +- LogicalFilter(condition=[OR(=($0, 1), =($0, 3), IS 
> NULL($0))])
> Sep 07 11:25:08+- LogicalTableScan(table=[[default_catalog, 
> default_database, NullTable3]])
> Sep 07 11:25:08 
> Sep 07 11:25:08 == Optimized Logical Plan ==
> Sep 07 11:25:08 Calc(select=[a, b, c], where=[SEARCH(a, Sarg[1, 3; NULL AS 
> TRUE])])
> Sep 07 11:25:08 +- BoundedStreamScan(table=[[default_catalog, 
> default_database, NullTable3]], fields=[a, b, c])
> Sep 07 11:25:08 
> Sep 07 11:25:08
> Sep 07 11:25:08   at org.junit.Assert.fail(Assert.java:89)
> Sep 07 11:25:08   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.$anonfun$check$1(BatchTestBase.scala:154)
> Sep 07 11:25:08   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.$anonfun$check$1$adapted(BatchTestBase.scala:147)
> Sep 07 11:25:08   at scala.Option.foreach(Option.scala:257)
> Sep 07 11:25:08   at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:147)
> Sep 07 11:25:08   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
> Sep 07 11:25:08   at 
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
> Sep 07 11:25:08   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> Sep 07 11:25:08 
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >