[jira] [Closed] (FLINK-27411) Move lookup table source cache logic to flink-table-runtime module
[ https://issues.apache.org/jira/browse/FLINK-27411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Smirnov closed FLINK-27411. - Resolution: Workaround > Move lookup table source cache logic to flink-table-runtime module > -- > > Key: FLINK-27411 > URL: https://issues.apache.org/jira/browse/FLINK-27411 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase, Connectors / JDBC, Table SQL / API, > Table SQL / Planner, Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: Alexander Smirnov >Priority: Major > Attachments: LookupJoin(2).png > > > The idea was inspired by FLIP > [https://cwiki.apache.org/confluence/display/FLINK/FLI..|https://vk.com/away.php?to=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-221%2BAbstraction%2Bfor%2Blookup%2Bsource%2Bcache%2Band%2Bmetric&cc_key=]. > [~renqs] and Yuan Zhu have done great work on this. But I suggest to > implement it in a slightly different way, that will allow applying > optimizations to caching + requires less dependencies to connectors. > The point is to move logic of caching to a lower level - to the level of > flink-table-runtime operators. Architecture of lookup join looks like this > (red text - schematic representation of proposed changes): > !LookupJoin(2).png|width=589,height=264! > LookupConfig is named like this (not CacheConfig) because it can also > contains non-cache options for lookup join (for example, > 'lookup.max-retries', 'lookup.async'...). > Changes in connectors - remove their own logic for configs, caching, retrying > queries. > Changes is public "Table SQL / API" - new class LookupConfig, new > ConfigOptions for lookup connectors and new method 'getLookupConfig' in > LookupTableSource. > {code:java} > @PublicEvolving > public interface LookupTableSource extends DynamicTableSource { > ... > /** @return configurations for planning lookup join and executing it in > runtime. */ > default LookupConfig getLookupConfig() { > return null; > } >... > }{code} > Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his > inheritors. > Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner, > LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner, > AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern > here to avoid code duplication and a large number of classes, but in our > private version design is like this (maybe not so elegant). > With such architecture we can apply further optimizations to caching: > 1) Caching after calculations. LookupJoinRunnerWithCalc + > AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc + > AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function > contains calculations on fields of lookup table, and most of the time these > calculations are filters and projections. > For example, if we have join table A with lookup table B using condition > ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’ > function will contain filters 'A.age = B.age + 10 and B.salary > 1000'. > If we apply this function before storing records in cache, size of cache will > be significantly reduced: filters = avoid storing useless records in cache, > projections = reduce records’ size. So the initial max number of records in > cache can be increased by the user. > 2) Constant keys optimization. If join condition contains constants, for > example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store > '10' in cache. Currently TableFunction's 'eval' method is called with values > 'A.name' and 10, so we store '10' in cache for every row, which is pretty > useless. > Notice, that in this change I didn't mention Hive lookup connector, because > it stores all data in memory. This logic can be replaced in future by 'ALL' > cache strategy, that was mentioned in original FLIP. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22808: [FLINK-32316] [SourceAlignment] Scheduling announceCombinedWatermark period task during SourceCoordinator::start not in construc
1996fanrui commented on code in PR #22808: URL: https://github.com/apache/flink/pull/22808#discussion_r1237971048 ## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ## @@ -131,6 +136,73 @@ void testWatermarkAlignmentWithTwoGroups() throws Exception { } } +/** + * When JobManager failover and auto recover job, SourceCoordinator will reset twice: 1. Create + * JobMaster --> Create Scheduler --> Create DefaultExecutionGraph --> Init + * SourceCoordinator(but will not start it) 2. JobMaster call + * restoreLatestCheckpointedStateInternal, which will call {@link + * org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator#resetToCheckpoint(long,byte[])} + * and reset SourceCoordinator. Because the first SourceCoordinator is not be started, so the + * period task can't be stopped. + */ +@Test +void testSourceCoordinatorReset() throws Exception { Review Comment: In general, this test is checking that the `announceCombinedWatermark` shouldn't be called without `SourceCoordinator#start`, the reset is just a case of this test(Currently there is only one case.). right? If so, I think the method name isn't general. How about `testAnnounceCombinedWatermarkWithoutStart`? ## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ## @@ -131,6 +136,73 @@ void testWatermarkAlignmentWithTwoGroups() throws Exception { } } +/** + * When JobManager failover and auto recover job, SourceCoordinator will reset twice: 1. Create + * JobMaster --> Create Scheduler --> Create DefaultExecutionGraph --> Init + * SourceCoordinator(but will not start it) 2. JobMaster call + * restoreLatestCheckpointedStateInternal, which will call {@link + * org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator#resetToCheckpoint(long,byte[])} + * and reset SourceCoordinator. Because the first SourceCoordinator is not be started, so the + * period task can't be stopped. + */ +@Test +void testSourceCoordinatorReset() throws Exception { +long maxDrift = 1000L; +WatermarkAlignmentParams params = +new WatermarkAlignmentParams(maxDrift, "group1", maxDrift); + +final Source> mockSource = +createMockSource(); + +// First to init a SourceCoordinator to simulate JobMaster init SourceCoordinator +AtomicInteger counter1 = new AtomicInteger(0); +sourceCoordinator = +new SourceCoordinator>( +OPERATOR_NAME, +mockSource, +getNewSourceCoordinatorContext(), +new CoordinatorStoreImpl(), +params, +null) { +@Override +void announceCombinedWatermark() { +counter1.incrementAndGet(); +} +}; + +// Second we call SourceCoordinator::close and re-init SourceCoordinator to simulate +// RecreateOnResetOperatorCoordinator::resetToCheckpoint +sourceCoordinator.close(); +AtomicInteger counter2 = new AtomicInteger(0); +sourceCoordinator = +new SourceCoordinator>( +OPERATOR_NAME, +mockSource, +getNewSourceCoordinatorContext(), +new CoordinatorStoreImpl(), +params, +null) { +@Override +void announceCombinedWatermark() { +counter2.incrementAndGet(); +} +}; + +final int subtask = 0; +int attemptNumber = 0; +sourceCoordinator.start(); +setReaderTaskReady(sourceCoordinator, subtask, attemptNumber); + +sourceCoordinator.handleEventFromOperator( +subtask, attemptNumber, new ReportedWatermarkEvent(1000)); Review Comment: As I understand, these lines are not necessary, right? ```suggestion ``` ## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ## @@ -131,6 +136,73 @@ void testWatermarkAlignmentWithTwoGroups() throws Exception { } } +/** + * When JobManager failover and auto recover job, SourceCoordinator will reset twice: 1. Create + * JobMaster --> Create Scheduler --> Create DefaultExecutionGraph --> Init + * SourceCoordinator(but will not start it) 2. JobMaster call + * restoreLatestCheckpointedStateInternal, which will call {@link + * org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator#resetToCheckp
[GitHub] [flink] flinkbot commented on pull request #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks
flinkbot commented on PR #22845: URL: https://github.com/apache/flink/pull/22845#issuecomment-1601970488 ## CI report: * 3e53f0587d57073a6db12b1381aaa9ed4fba644d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks
1996fanrui commented on PR #22845: URL: https://github.com/apache/flink/pull/22845#issuecomment-1601969939 Hi @becketqin @pnowojski @LoveHeat , would you mind help take a look this PR in your free time? thanks~ I have described this bug in detail in FLINK-32411. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32411: --- Labels: pull-request-available (was: ) > SourceCoordinator thread leaks when job recovers from checkpoint > > > Key: FLINK-32411 > URL: https://issues.apache.org/jira/browse/FLINK-32411 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.16.2, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2023-06-22-11-12-35-747.png > > > SourceCoordinator thread leaks when job recovers from checkpoint, from the > following figure, we can see: > * 2 SourceCoordinator thread for slow SlowNumberSequenceSource > * 2 SourceCoordinator thread for slow FastNumberSequenceSource > !image-2023-06-22-11-12-35-747.png|width=889,height=225! > h1. Root cause: > # When initialize the ExecutionJobVertex of source, > RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code > link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60] > # When job recovers from checkpoint, > [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120] > will close the old coordinator, and create a new coordinator. > # The > [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271] > just close the SourceCoordinatorContext after coordinator is started, so the > SourceCoordinatorContext of old coordinator won't be closed. > # The SourceCoordinatorContext create some threads in its > [constructor|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L118], > so it should be closed even if the SourceCoordinator isn't started. > > The call stack about creating SourceCoordinator: > {code:java} > // Create the first SourceCoordinator > "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable > java.lang.Thread.State: RUNNABLE > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) > - locked <0x1f02> (a > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850) > at > org.apache.flink.runtime.executiongraph.DefaultE
[GitHub] [flink] 1996fanrui opened a new pull request, #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks
1996fanrui opened a new pull request, #22845: URL: https://github.com/apache/flink/pull/22845 ## What is the purpose of the change Fix the bug about SourceCoordinator thread leaks. You can get more detailed background from FLINK-32411. ## Brief change log Closing the SourceCoordinatorContext inside of the SourceCoordinator#close even if the SourceCoordinator isn't started. ## Verifying this change This change added tests and can be verified as follows: - *Added the SourceCoordinatorTest#testClosedWithoutStart* - *Improved the SourceCoordinatorTest#testClosed* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector:no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-32411: Description: SourceCoordinator thread leaks when job recovers from checkpoint, from the following figure, we can see: * 2 SourceCoordinator thread for slow SlowNumberSequenceSource * 2 SourceCoordinator thread for slow FastNumberSequenceSource !image-2023-06-22-11-12-35-747.png|width=889,height=225! h1. Root cause: # When initialize the ExecutionJobVertex of source, RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60] # When job recovers from checkpoint, [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120] will close the old coordinator, and create a new coordinator. # The [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271] just close the SourceCoordinatorContext after coordinator is started, so the SourceCoordinatorContext of old coordinator won't be closed. # The SourceCoordinatorContext create some threads in its [constructor|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L118], so it should be closed even if the SourceCoordinator isn't started. The call stack about creating SourceCoordinator: {code:java} // Create the first SourceCoordinator "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) - locked <0x1f02> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912) at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoo
[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-32411: Description: SourceCoordinator thread leaks when job recovers from checkpoint, from the following figure, we can see: * 2 SourceCoordinator thread for slow SlowNumberSequenceSource * 2 SourceCoordinator thread for slow FastNumberSequenceSource !image-2023-06-22-11-12-35-747.png|width=889,height=225! h1. Root cause: # When initialize the ExecutionJobVertex of source, RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60] # When job recovers from checkpoint, [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120] will close the old coordinator, and create a new coordinator. # The [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271] just close the SourceCoordinatorContext after coordinator is started, so the SourceCoordinatorContext of old coordinator won't be closed. # The SourceCoordinatorContext create some threads in its constructor, so it should be closed even if the SourceCoordinator isn't started. The call stack about creating SourceCoordinator: {code:java} // Create the first SourceCoordinator "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) - locked <0x1f02> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912) at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:378) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMast
[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-32411: Attachment: (was: image-2023-06-22-11-00-25-446.png) > SourceCoordinator thread leaks when job recovers from checkpoint > > > Key: FLINK-32411 > URL: https://issues.apache.org/jira/browse/FLINK-32411 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.16.2, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Attachments: image-2023-06-22-11-12-35-747.png > > > SourceCoordinator thread leaks when job recovers from checkpoint, from the > following figure, we can see: > * 2 SourceCoordinator thread for slow SlowNumberSequenceSource > * 2 SourceCoordinator thread for slow FastNumberSequenceSource > !image-2023-06-22-11-12-35-747.png|width=889,height=225! > h1. Root cause: > # When initialize the ExecutionJobVertex of source, > RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code > link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60] > # When job recovers from checkpoint, > [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120] > will close the old coordinator, and create a new coordinator. > # The > [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271] > just close the SourceCoordinatorContext after coordinator is started, so the > SourceCoordinatorContext of old coordinator won't be closed. > # The SourceCoordinatorContext create some threads in its constructor, so it > should be closed even if the SourceCoordinator isn't started. > > The call stack about creating SourceCoordinator: > {code:java} > // Create the first SourceCoordinator > "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable > java.lang.Thread.State: RUNNABLE > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) > - locked <0x1f02> (a > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207) > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163) > at >
[jira] [Created] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint
Rui Fan created FLINK-32411: --- Summary: SourceCoordinator thread leaks when job recovers from checkpoint Key: FLINK-32411 URL: https://issues.apache.org/jira/browse/FLINK-32411 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.17.1, 1.16.2 Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-06-22-11-00-25-446.png, image-2023-06-22-11-12-35-747.png SourceCoordinator thread leaks when job recovers from checkpoint, from the following figure, we can see: * 2 SourceCoordinator thread for slow SlowNumberSequenceSource * 2 SourceCoordinator thread for slow FastNumberSequenceSource !image-2023-06-22-11-12-35-747.png|width=889,height=225! h1. Root cause: # When initialize the ExecutionJobVertex of source, RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60] # When job recovers from checkpoint, [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120] will close the old coordinator, and create a new coordinator. # The [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271] just close the SourceCoordinatorContext after coordinator is started, so the SourceCoordinatorContext of old coordinator won't be closed. # The SourceCoordinatorContext create some threads in its constructor, so it should be closed even if the SourceCoordinator isn't started. The call stack about create SourceCoordinator: {code:java} // Create the first SourceCoordinator "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) - locked <0x1f02> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912) at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(De
[GitHub] [flink] afedulov commented on pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
afedulov commented on PR #21028: URL: https://github.com/apache/flink/pull/21028#issuecomment-1601564665 @zentol One concerns regarding the idea to migrate the existing methods in `StreamExecutionEnvironment` to `DataGeneratorSource` is that adding the `flink-streaming-java` dependency on `flink-connector-datagen` introduces a somewhat unexpected cycle between `flink-architecture-tests-test` and `flink-clients`. How can we address this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31958) Table to DataStream allow partial fields
[ https://issues.apache.org/jira/browse/FLINK-31958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735812#comment-17735812 ] padavan commented on FLINK-31958: - plz vote to this issue if you often work with models and hit this problem (y) > Table to DataStream allow partial fields > > > Key: FLINK-31958 > URL: https://issues.apache.org/jira/browse/FLINK-31958 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Table SQL / API >Reporter: padavan >Priority: Major > > Hello i have a Model with many many fields, example: > {code:java} > public class UserModel { public int userId; public int count; public int zip; > public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime > wEnd; }{code} > I work with Table API, select fields and convert Table to DataStream by > Model. But problem what *i should select all fields if I don't even need it* > or i will get exception > {quote}Column types of query result and sink for do not match. Cause: > Different number of columns. > {quote} > And I just have to substitute fake data for the plugs... > > I want simple use with only fields wich i have selected like: > {code:java} > .select($("userId"), $("count").sum().as("count")); > DataStream dataStream = te.toDataStream(win, > UserModel.class);{code} > > *Excepted:* > Remove rule valdiation "Different number of columns.". If a column is not > selected it is initialized by default(T) / Null -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22844: [WIP][FLINK-31784][runtime] Adds multi-component support to DefaultLeaderElectionService
flinkbot commented on PR #22844: URL: https://github.com/apache/flink/pull/22844#issuecomment-1601072224 ## CI report: * 506a625457f00f4c99d8ccc6a8bef17aeb4a5749 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30141) MinioTestContainerTest failed due to IllegalStateException in container startup
[ https://issues.apache.org/jira/browse/FLINK-30141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735782#comment-17735782 ] Ryan Skraba commented on FLINK-30141: - I took a look at this and the related issue FLINK-26402 -- it looks like the *503 Service Unavailable* statuses are not rare: they occur about 1 in a couple hundred API calls to Minio on container startup. On the other hand, the retry mechanism built into Amazon API clients _usually_ try again correctly until they succeed. Sometimes, the Minio container doesn't move to the correct state to service API calls quickly enough, the default retry strategy fails eventually and we see the error here. I can reproduce this pretty reliably by running a unit test somewhere between 1K-10K times. At first I assumed it occurred when the system was loaded while running the test, but that doesn't appear to be the case. Attempting to start up the container more than once might be the right thing to do here. If the call to Minio fails while creating the default bucket, the container should be discarded and tried again. This should have no overhead on the daily CI runs. > MinioTestContainerTest failed due to IllegalStateException in container > startup > --- > > Key: FLINK-30141 > URL: https://issues.apache.org/jira/browse/FLINK-30141 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Tests >Affects Versions: 1.17.0, 1.18.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43182&view=logs&j=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc&t=3a8f44aa-4415-5b14-37d5-5fecc568b139&l=15531] > failed due to an {{IllegalStateException}} during container startup: > {code:java} > Nov 15 02:34:04 [ERROR] > org.apache.flink.fs.s3.common.MinioTestContainerTest.testBucketCreation Time > elapsed: 120.874 s <<< ERROR! > Nov 15 02:34:04 org.testcontainers.containers.ContainerLaunchException: > Container startup failed > Nov 15 02:34:04 at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:345) > Nov 15 02:34:04 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:326) > Nov 15 02:34:04 at > org.apache.flink.core.testutils.TestContainerExtension.instantiateTestContainer(TestContainerExtension.java:59) > Nov 15 02:34:04 at > org.apache.flink.core.testutils.TestContainerExtension.before(TestContainerExtension.java:70) > Nov 15 02:34:04 at > org.apache.flink.core.testutils.EachCallbackWrapper.beforeEach(EachCallbackWrapper.java:45) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$2(TestMethodTestDescriptor.java:166) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:202) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:202) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:165) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:132) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.N
[jira] [Updated] (FLINK-31784) Add multiple-component support to DefaultLeaderElectionService
[ https://issues.apache.org/jira/browse/FLINK-31784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31784: --- Labels: pull-request-available (was: ) > Add multiple-component support to DefaultLeaderElectionService > -- > > Key: FLINK-31784 > URL: https://issues.apache.org/jira/browse/FLINK-31784 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp opened a new pull request, #22844: [WIP][FLINK-31784][runtime] Adds multi-component support to DefaultLeaderElectionService
XComp opened a new pull request, #22844: URL: https://github.com/apache/flink/pull/22844 * https://github.com/apache/flink/pull/21742 * https://github.com/apache/flink/pull/22379 * https://github.com/apache/flink/pull/22422 * https://github.com/apache/flink/pull/22380 * https://github.com/apache/flink/pull/22623 * https://github.com/apache/flink/pull/22384 * https://github.com/apache/flink/pull/22390 * https://github.com/apache/flink/pull/22404 * https://github.com/apache/flink/pull/22601 * https://github.com/apache/flink/pull/22642 * https://github.com/apache/flink/pull/22640 * https://github.com/apache/flink/pull/22656 * https://github.com/apache/flink/pull/22828 * https://github.com/apache/flink/pull/22830 * https://github.com/apache/flink/pull/22829 * https://github.com/apache/flink/pull/22661 * === THIS PR === FLINK-31784 ## What is the purpose of the change Replacing the `leaderContender` and `contenderID` fields in `DefaultLeaderElectionService` with a `leaderContenderRegistry` that allows adding multiple contenders. ## Brief change log * Removes `DefaultLeaderElectionService` fields `leaderContender` and `contenderID` * Adds `DefaultLeaderElectionService#leaderContenderRegistry` * Makes `MultpleComponentLeaderElectionDriver.Listener` interface methods first-class citizens and let the `LeaderElectionEventHandler` interface call those * Introduces `contenderID` in `EmbeddedLeaderElection` to allow logging * Removes `LeaderContender#getDescription()` ## Verifying this change * Added multi-component support test scenarios to `DefaultLeaderElectionServiceTest` * Copied relevant tests over from `DefaultMultipleComponentLeaderElectionServiceTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22843: [FLINK-30141][s3][filesystem] Increase retries on MinIO container startup
flinkbot commented on PR #22843: URL: https://github.com/apache/flink/pull/22843#issuecomment-1601050461 ## CI report: * ab40ed309eb39c427278658d47dbc2d6dc3139b4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32261) Add MAP_UNION support in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-32261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated FLINK-32261: Description: Description: This is an implementation of MAP_UNION Returns a map created by merging two maps, 'map1' and 'map2'. This two maps should have same data structure. If there are overlapping keys, the value from 'map2' will overwrite the value from 'map1'. If any of maps are null, return null. Syntax: {code:java} MAP_UNION(map1, map2){code} Arguments: * map1:The first map to be merged. * map2:The second map to be merged. Returns: A new map that contains the combined key-value pairs from {{map1}} and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will overwrite the value from {{{}map1{}}}. Examples: Merging maps with unique keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] map_union[map1, map2] Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} Merging maps with overlapping keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] map_union[map1, map2] Output: ['a': 1, 'b': 3, 'c': 4]{code} See also: prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] was: Description: This is an implementation of MAP_UNION Returns a map created by merging two maps, 'map1' and 'map2'. This two maps should have same data structure. If there are overlapping keys, the value from 'map2' will overwrite the value from 'map1'. If any of maps are null, return null. Syntax: `MAP_UNION(map1, map2)` Arguments: * map1:The first map to be merged. * map2:The second map to be merged. Returns: A new map that contains the combined key-value pairs from {{map1}} and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will overwrite the value from {{{}map1{}}}. Examples: Merging maps with unique keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] map_union[map1, map2] Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} Merging maps with overlapping keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] map_union[map1, map2] Output: ['a': 1, 'b': 3, 'c': 4]{code} See also: prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] > Add MAP_UNION support in SQL & Table API > > > Key: FLINK-32261 > URL: https://issues.apache.org/jira/browse/FLINK-32261 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Bonnie Varghese >Assignee: Hanyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Description: > This is an implementation of MAP_UNION > Returns a map created by merging two maps, 'map1' and 'map2'. This two maps > should have same data structure. If there are overlapping keys, the value > from 'map2' will overwrite the value from 'map1'. If any of maps are null, > return null. > Syntax: > {code:java} > MAP_UNION(map1, map2){code} > Arguments: > * map1:The first map to be merged. > * map2:The second map to be merged. > Returns: A new map that contains the combined key-value pairs from {{map1}} > and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} > will overwrite the value from {{{}map1{}}}. > Examples: > Merging maps with unique keys: > > {code:java} > map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] > map_union[map1, map2] > Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} > Merging maps with overlapping keys: > > > {code:java} > map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] > map_union[map1, map2] > Output: ['a': 1, 'b': 3, 'c': 4]{code} > See also: > prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32261) Add MAP_UNION support in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-32261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated FLINK-32261: Description: Description: This is an implementation of MAP_UNION Returns a map created by merging two maps, 'map1' and 'map2'. This two maps should have same data structure. If there are overlapping keys, the value from 'map2' will overwrite the value from 'map1'. If any of maps are null, return null. Syntax: `MAP_UNION(map1, map2)` Arguments: * map1:The first map to be merged. * map2:The second map to be merged. Returns: A new map that contains the combined key-value pairs from {{map1}} and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will overwrite the value from {{{}map1{}}}. Examples: Merging maps with unique keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] map_union[map1, map2] Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} Merging maps with overlapping keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] map_union[map1, map2] Output: ['a': 1, 'b': 3, 'c': 4]{code} See also: prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] was: Description: The current implementation of the {{map_union}} method in the Flink library does not provide a way to combine two dictionaries into a single dictionary. This enhancement aims to add this functionality, allowing users to merge dictionaries efficiently. Syntax: {code:java} map_union[map1: map, map2: map] -> map{code} Arguments: * map1:The first map to be merged. * map2:The second map to be merged. Returns: A new map that contains the combined key-value pairs from {{map1}} and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will overwrite the value from {{{}map1{}}}. Examples: Merging maps with unique keys: {code:java} map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] result = map_union(map1, map2) Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} Merging maps with overlapping keys: {code:java} map1 = ['a': 1, 'b': 2] dict2 = ['b': 3, 'c': 4] result = map_union[dict1, dict2] Output: ['a': 1, 'b': 3, 'c': 4]{code} See also: prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] > Add MAP_UNION support in SQL & Table API > > > Key: FLINK-32261 > URL: https://issues.apache.org/jira/browse/FLINK-32261 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Bonnie Varghese >Assignee: Hanyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Description: > This is an implementation of MAP_UNION > Returns a map created by merging two maps, 'map1' and 'map2'. This two maps > should have same data structure. If there are overlapping keys, the value > from 'map2' will overwrite the value from 'map1'. If any of maps are null, > return null. > Syntax: > `MAP_UNION(map1, map2)` > Arguments: > * map1:The first map to be merged. > * map2:The second map to be merged. > Returns: A new map that contains the combined key-value pairs from {{map1}} > and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} > will overwrite the value from {{{}map1{}}}. > Examples: > Merging maps with unique keys: > > {code:java} > map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] > map_union[map1, map2] > Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} > Merging maps with overlapping keys: > > > {code:java} > map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] > map_union[map1, map2] > Output: ['a': 1, 'b': 3, 'c': 4]{code} > See also: > prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30141) MinioTestContainerTest failed due to IllegalStateException in container startup
[ https://issues.apache.org/jira/browse/FLINK-30141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30141: --- Labels: pull-request-available test-stability (was: test-stability) > MinioTestContainerTest failed due to IllegalStateException in container > startup > --- > > Key: FLINK-30141 > URL: https://issues.apache.org/jira/browse/FLINK-30141 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Tests >Affects Versions: 1.17.0, 1.18.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43182&view=logs&j=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc&t=3a8f44aa-4415-5b14-37d5-5fecc568b139&l=15531] > failed due to an {{IllegalStateException}} during container startup: > {code:java} > Nov 15 02:34:04 [ERROR] > org.apache.flink.fs.s3.common.MinioTestContainerTest.testBucketCreation Time > elapsed: 120.874 s <<< ERROR! > Nov 15 02:34:04 org.testcontainers.containers.ContainerLaunchException: > Container startup failed > Nov 15 02:34:04 at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:345) > Nov 15 02:34:04 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:326) > Nov 15 02:34:04 at > org.apache.flink.core.testutils.TestContainerExtension.instantiateTestContainer(TestContainerExtension.java:59) > Nov 15 02:34:04 at > org.apache.flink.core.testutils.TestContainerExtension.before(TestContainerExtension.java:70) > Nov 15 02:34:04 at > org.apache.flink.core.testutils.EachCallbackWrapper.beforeEach(EachCallbackWrapper.java:45) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$2(TestMethodTestDescriptor.java:166) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:202) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:202) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:165) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:132) > Nov 15 02:34:04 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.executeNonConcurrentTasks(ForkJoinPoolHierarchicalTestExecutorService.java:155) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:135) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > Nov 15 02:34:04 at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > Nov 15 02:34:04 at > org.junit.platform.engine.support.
[GitHub] [flink] RyanSkraba opened a new pull request, #22843: [FLINK-30141][s3][filesystem] Increase retries on MinIO container startup
RyanSkraba opened a new pull request, #22843: URL: https://github.com/apache/flink/pull/22843 ## What is the purpose of the change We observe flaky tests using MinIO about once per month, and it appears to be due to a container responding to API requests with enough **503 Service Unavailable** statuses that the default retry mechanism on the `AmazonS3` client fails. ## Brief change log Increase the number of TestContainer startup attempts from 1 to 3. I chose to do this at the TestContainer level instead of making the S3 client retry more often, since it is a (1) a sufficiently rare event and (2) specifically related to container start-up. ## Verifying this change This change is already covered by existing tests, such as MinioTestContainerTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no** - The S3 file system connector: **yes** (indirectly) ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **no** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function
flinkbot commented on PR #22842: URL: https://github.com/apache/flink/pull/22842#issuecomment-1601024848 ## CI report: * 4f26f9375e99db7765ce6a4bf6d161bc8ef4899e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32261) Add MAP_UNION support in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-32261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32261: --- Labels: pull-request-available (was: ) > Add MAP_UNION support in SQL & Table API > > > Key: FLINK-32261 > URL: https://issues.apache.org/jira/browse/FLINK-32261 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Bonnie Varghese >Assignee: Hanyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Description: > The current implementation of the {{map_union}} method in the Flink library > does not provide a way to combine two dictionaries into a single dictionary. > This enhancement aims to add this functionality, allowing users to merge > dictionaries efficiently. > Syntax: > > {code:java} > map_union[map1: map, map2: map] -> map{code} > Arguments: > * map1:The first map to be merged. > * map2:The second map to be merged. > Returns: A new map that contains the combined key-value pairs from {{map1}} > and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} > will overwrite the value from {{{}map1{}}}. > Examples: > Merging maps with unique keys: > > {code:java} > map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] result = map_union(map1, > map2) Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code} > Merging maps with overlapping keys: > > > {code:java} > map1 = ['a': 1, 'b': 2] dict2 = ['b': 3, 'c': 4] result = map_union[dict1, > dict2] Output: ['a': 1, 'b': 3, 'c': 4]{code} > See also: > prestodb: [https://prestodb.io/docs/current/functions/aggregate.html] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] hanyuzheng7 opened a new pull request, #22842: [FLINK-32261]-table-Add-MAP_UNION-function
hanyuzheng7 opened a new pull request, #22842: URL: https://github.com/apache/flink/pull/22842 Description: The current implementation of the map_union method in the Flink library does not provide a way to combine two dictionaries into a single dictionary. This enhancement aims to add this functionality, allowing users to merge dictionaries efficiently. Syntax: map_union[map1: map, map2: map] -> map Arguments: map1:The first map to be merged. map2:The second map to be merged. Returns: A new map that contains the combined key-value pairs from map1 and map2. If there are any overlapping keys, the value from map2 will overwrite the value from map1. Examples: Merging maps with unique keys: map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] result = map_union(map1, map2) Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4] Merging maps with overlapping keys: map1 = ['a': 1, 'b': 2] dict2 = ['b': 3, 'c': 4] result = map_union[dict1, dict2] Output: ['a': 1, 'b': 3, 'c': 4] See also: prestodb: https://prestodb.io/docs/current/functions/aggregate.html ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-32408. -- Resolution: Fixed > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22841: [FLINK-32410] Allocate hash-based collections with sufficient capacity for expected size
flinkbot commented on PR #22841: URL: https://github.com/apache/flink/pull/22841#issuecomment-1600948809 ## CI report: * d8f3e1e99759cb5b91f7d560f5fbda643e4255a2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22806: [FLINK-32362] [SourceAlignment] increase the robustness of announceCombinedWatermark to cover the case task failover
1996fanrui commented on code in PR #22806: URL: https://github.com/apache/flink/pull/22806#discussion_r1236917373 ## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ## @@ -195,9 +195,19 @@ void announceCombinedWatermark() { "Distributing maxAllowedWatermark={} to subTaskIds={}", maxAllowedWatermark, subTaskIds); -for (Integer subtaskId : subTaskIds) { -context.sendEventToSourceOperator( -subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +// Because of Java-ThreadPoolExecutor will not schedule the period task +// if it throws an exception, so we should handle the potential exception like +// "subtask xx is not ready yet to receive events" to increase robustness. +try { +for (Integer subtaskId : subTaskIds) { +context.sendEventToSourceOperator( +subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +} Review Comment: Hi @LoveHeat , thanks for your feedback. As I said at FLINK-32362 before, I'm not sure what should we do when some subtasks are not ready. - Option1: Send event to all ready subtasks, and just ignore unready subtasks. - Option2: Don't send any event before all subtasks are ready. If we expect option1, we should ensure the event is sent to all ready subtasks. > In my opinion, if one task is during failing, other tasks maybe also during failing with high probability When `jobmanager.execution.failover-strategy` is region, and all subtasks don't have shuffle link. If subtask0 cannot start or fails, other subtasks should work well, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0
snuyanzin commented on code in PR #22558: URL: https://github.com/apache/flink/pull/22558#discussion_r1237090987 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml: ## @@ -107,14 +107,14 @@ Calc(select=[*org.apache.flink.table.planner.expressions.utils.Func24$$2da7dcb3c
[jira] [Updated] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size
[ https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32410: --- Labels: pull-request-available (was: ) > Allocate hash-based collections with sufficient capacity for expected size > -- > > Key: FLINK-32410 > URL: https://issues.apache.org/jira/browse/FLINK-32410 > Project: Flink > Issue Type: Improvement >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The JDK API to create hash-based collections for a certain capacity is > arguable misleading because it doesn't size the collections to "hold a > specific number of items" like you'd expect it would. Instead it sizes it to > hold load-factor% of the specified number. > For the common pattern to allocate a hash-based collection with the size of > expected elements to avoid rehashes, this means that a rehash is essentially > guaranteed. > We should introduce helper methods (similar to Guava's > `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and > replace the direct constructor calls with those. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] StefanRRichter opened a new pull request, #22841: [FLINK-32410] Allocate hash-based collections with sufficient capacity for expected size
StefanRRichter opened a new pull request, #22841: URL: https://github.com/apache/flink/pull/22841 ## What is the purpose of the change The JDK API to create hash-based collections for a certain capacity is arguably misleading because it doesn't size the collections to "hold a specific number of items" like you'd expect it would. Instead it sizes it to hold "load-factor%" of the specified number. For the common pattern to allocate a hash-based collection with the size of expected elements to avoid rehashes, this means that a rehash is essentially guaranteed. This PR replaces constructor calls for allocations for expected size with helper methods (similar to Guava's `Maps.newHashMapWithExpectedSize(int)`) . ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32409) Moves componentId/contenderID handling from DefaultMultipleComponentLeaderElectionService into DefaultLeaderElectionService
[ https://issues.apache.org/jira/browse/FLINK-32409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-32409: -- Summary: Moves componentId/contenderID handling from DefaultMultipleComponentLeaderElectionService into DefaultLeaderElectionService (was: Remove MultipleComponentLeaderElectionDriverAdapter) > Moves componentId/contenderID handling from > DefaultMultipleComponentLeaderElectionService into > DefaultLeaderElectionService > --- > > Key: FLINK-32409 > URL: https://issues.apache.org/jira/browse/FLINK-32409 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size
Stefan Richter created FLINK-32410: -- Summary: Allocate hash-based collections with sufficient capacity for expected size Key: FLINK-32410 URL: https://issues.apache.org/jira/browse/FLINK-32410 Project: Flink Issue Type: Improvement Reporter: Stefan Richter Assignee: Stefan Richter Fix For: 1.19.0 The JDK API to create hash-based collections for a certain capacity is arguable misleading because it doesn't size the collections to "hold a specific number of items" like you'd expect it would. Instead it sizes it to hold load-factor% of the specified number. For the common pattern to allocate a hash-based collection with the size of expected elements to avoid rehashes, this means that a rehash is essentially guaranteed. We should introduce helper methods (similar to Guava's `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and replace the direct constructor calls with those. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] afedulov closed pull request #17314: [WIP][FLINK-22790] HybridSource E2E tests
afedulov closed pull request #17314: [WIP][FLINK-22790] HybridSource E2E tests URL: https://github.com/apache/flink/pull/17314 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov closed pull request #17215: [FLINK-22790] HybridSource E2E tests
afedulov closed pull request #17215: [FLINK-22790] HybridSource E2E tests URL: https://github.com/apache/flink/pull/17215 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a diff in pull request #22827: [FLINK-20887][table-planner] Disable project merge during sql2rel phase by default to avoid incorrectly project merge
lincoln-lil commented on code in PR #22827: URL: https://github.com/apache/flink/pull/22827#discussion_r1236728489 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java: ## @@ -352,8 +358,25 @@ public void testProjectionIncludingOnlyMetadata() { .containsExactly("metadata"); } +private void replaceProgramWithProjectMergeRule() { +FlinkChainedProgram programs = new FlinkChainedProgram(); +programs.addLast( +"rules", + FlinkHepRuleSetProgramBuilder.newBuilder() + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()) +.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) +.add( +RuleSets.ofList( +CoreRules.PROJECT_MERGE, + PushProjectIntoTableSourceScanRule.INSTANCE)) +.build()); +util().replaceBatchProgram(programs); +} Review Comment: the ast changes after we disable project merge during sql2rel phase by default in the 1st commit, and this rule test will fail, so it should stay in 1st commit ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCalcMergeRule.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.utils.InputRefVisitor; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.logical.LogicalCalc; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.ProjectCalcMergeRule; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Extends calcite's ProjectCalcMergeRule, modification: does not merge the filter references field + * which generated by non-deterministic function. + */ +public class FlinkProjectCalcMergeRule extends ProjectCalcMergeRule { + +public static final RelOptRule INSTANCE = new FlinkProjectCalcMergeRule(Config.DEFAULT); + +protected FlinkProjectCalcMergeRule(Config config) { +super(config); +} + +@Override +public void onMatch(RelOptRuleCall call) { +LogicalProject project = call.rel(0); +LogicalCalc calc = call.rel(1); + +List expandProjects = +calc.getProgram().getProjectList().stream() +.map(p -> calc.getProgram().expandLocalRef(p)) +.collect(Collectors.toList()); +InputRefVisitor inputRefVisitor = new InputRefVisitor(); +project.getProjects().forEach(p -> p.accept(inputRefVisitor)); +boolean existNonDeterministicRef = +Arrays.stream(inputRefVisitor.getFields()) +.anyMatch(i -> !RexUtil.isDeterministic(expandProjects.get(i))); + +if (!existNonDeterministicRef) { +super.onMatch(call); +} +} +} Review Comment: make sense, I'll update it. ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala: ## @@ -650,6 +654,75 @@ object FlinkRexUtil { rexBuilder, converter); } + + /** + * Return two neighbouring [[Project]] can merge into one [[Project]] or not. If the two + * [[Project]] can merge into one, each non-deterministic [[RexNode]] of bottom [[Project]] should + * appear at most once in the project list of top [[Project]]. + */ + def isMergeable(topProject: Project, bottomProject: Project): Boolean = { +val topInputRefCounter: Array[Int] = + Array.fill(topProject.getInput.getRowType.getFieldCount)(0) + +mergeable(topInputRefCounter, topProject.getProjects, bottomProject.getProjects) + } + + /** + * An InputRefCounter that count every inputRef's referenc
[jira] [Created] (FLINK-32409) Remove MultipleComponentLeaderElectionDriverAdapter
Matthias Pohl created FLINK-32409: - Summary: Remove MultipleComponentLeaderElectionDriverAdapter Key: FLINK-32409 URL: https://issues.apache.org/jira/browse/FLINK-32409 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32409) Remove MultipleComponentLeaderElectionDriverAdapter
[ https://issues.apache.org/jira/browse/FLINK-32409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-32409: - Assignee: Matthias Pohl > Remove MultipleComponentLeaderElectionDriverAdapter > --- > > Key: FLINK-32409 > URL: https://issues.apache.org/jira/browse/FLINK-32409 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735722#comment-17735722 ] Gyula Fora commented on FLINK-32408: If you are using Operator 1.5.0 with Flink 1.17.1 you need to use the old config key. But it will work > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735717#comment-17735717 ] dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:22 PM: -- I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version, since they had flink version configured to 1.16.1 was (Author: JIRAUSER300481): I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version, since they had flink version to 1.16.1 > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735717#comment-17735717 ] dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:21 PM: -- I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version, since they had flink version to 1.16.1 was (Author: JIRAUSER300481): I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version. > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735717#comment-17735717 ] dongwoo.kim commented on FLINK-32408: - I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version. > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32408: Description: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work but it didn't, only *high-availability* works *ref* [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] was: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work but it didn't{*}.{*} *ref* [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on pull request #22661: [WIP][FLINK-31783][runtime] Migrates DefaultLeaderElectionService from LeaderElectionDriver to the MultipleComponentLeaderElectionDriver interf
XComp commented on PR #22661: URL: https://github.com/apache/flink/pull/22661#issuecomment-1600813763 https://github.com/apache/flink/pull/22661/commits/4338f36dee313ca5d5901a753961323e9f299372 is the "initial" commit that refactors the interfaces. The outcome were failing test because I didn't do a good job fixing the tests in the first run. I fixed the individual tests in the subsequent commits. I suggest going through the PR one commit at a time. That's especially useful for the commits where I went through the test methods to make sure that the function as expected, still. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32408: Description: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work but it didn't{*}.{*} *ref* [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] was: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work ** but it didn't{*}.{*} ref: [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't{*}.{*} > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
dongwoo.kim created FLINK-32408: --- Summary: JobManager HA configuration update needed in Flink k8s Operator Key: FLINK-32408 URL: https://issues.apache.org/jira/browse/FLINK-32408 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: dongwoo.kim Fix For: kubernetes-operator-1.6.0 In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work ** but it didn't{*}.{*} ref: [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22840: [WIP][FLINK-32376] Extend Sink#InitContext
flinkbot commented on PR #22840: URL: https://github.com/apache/flink/pull/22840#issuecomment-1600790449 ## CI report: * 7495b60bb3d5c27b68bc9b79d733ca1c9b9758b7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32376) [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID
[ https://issues.apache.org/jira/browse/FLINK-32376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32376: --- Labels: pull-request-available (was: ) > [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and > JobID > -- > > Key: FLINK-32376 > URL: https://issues.apache.org/jira/browse/FLINK-32376 > Project: Flink > Issue Type: Improvement >Reporter: João Boto >Priority: Major > Labels: pull-request-available > > Implementation of FLIP-287 (linked) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] eskabetxe opened a new pull request, #22840: [WIP][FLINK-32376] Extend Sink#InitContext
eskabetxe opened a new pull request, #22840: URL: https://github.com/apache/flink/pull/22840 ## What is the purpose of the change Address the [FLIP-287 Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853) ## Brief change log Add three new methods to Sink#InitContext - isObjectReuseEnabled - createInputSerializer - getJobId ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no ) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( yes ) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): ( no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no ) - The S3 file system connector: ( no ) ## Documentation - Does this pull request introduce a new feature? ( yes ) - If yes, how is the feature documented? ( not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22806: [FLINK-32362] [SourceAlignment] increase the robustness of announceCombinedWatermark to cover the case task failover
1996fanrui commented on code in PR #22806: URL: https://github.com/apache/flink/pull/22806#discussion_r1236917373 ## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ## @@ -195,9 +195,19 @@ void announceCombinedWatermark() { "Distributing maxAllowedWatermark={} to subTaskIds={}", maxAllowedWatermark, subTaskIds); -for (Integer subtaskId : subTaskIds) { -context.sendEventToSourceOperator( -subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +// Because of Java-ThreadPoolExecutor will not schedule the period task +// if it throws an exception, so we should handle the potential exception like +// "subtask xx is not ready yet to receive events" to increase robustness. +try { +for (Integer subtaskId : subTaskIds) { +context.sendEventToSourceOperator( +subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +} Review Comment: Hi @LoveHeat , thanks for your feedback. As I said at FLINK-32362 before, I'm not sure what should we do when some subtasks are not ready. - Option1: Send event to all ready subtasks, and just ignore unready subtasks. - Option2: Don't send any event before all subtasks are ready. If we expect option1, we should ensure the event is sent to all ready subtasks. > In my opinion, if one task is during failing, other tasks maybe also during failing with high probability When `jobmanager.execution.failover-strategy` is region, and all subtasks don't have shuffle link. If subtask0 cannot start or fails, other subtasks should work well, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement
flinkbot commented on PR #22839: URL: https://github.com/apache/flink/pull/22839#issuecomment-1600710495 ## CI report: * 9604ce4793f3f9dcbf6d65fbbb9cf64bd690e6ce UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement
[ https://issues.apache.org/jira/browse/FLINK-32349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32349: --- Labels: pull-request-available (was: ) > Support atomic for CREATE TABLE AS SELECT(CTAS) statement > - > > Key: FLINK-32349 > URL: https://issues.apache.org/jira/browse/FLINK-32349 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: tartarus >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > For detailed information, see FLIP-305 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Tartarus0zm opened a new pull request, #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement
Tartarus0zm opened a new pull request, #22839: URL: https://github.com/apache/flink/pull/22839 ## What is the purpose of the change Support atomic for CREATE TABLE AS SELECT(CTAS) statement ## Brief change log Introduce StagedTable interface that support atomic operations. ## Verifying this change StagedTableITCase#testStagedTableWithAtomicCtas ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading
luoyuxia commented on code in PR #22249: URL: https://github.com/apache/flink/pull/22249#discussion_r1236760683 ## docs/content/docs/connectors/table/filesystem.md: ## @@ -50,6 +50,10 @@ CREATE TABLE MyUserTable ( -- section for more details 'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition -- column value is null/empty string + 'source.path.regex-pattern' = '...', -- optional: regex pattern to filter files under the directory Review Comment: ```suggestion 'source.path.regex-pattern' = '...', -- optional: regex pattern to filter files to read under the directory ``` ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.src.enumerate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.compression.StandardDeCompressors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.function.Predicate; + +/** + * This {@code FileEnumerator} enumerates all files under the given paths recursively except the + * hidden directories, and creates a separate split for each file block. + * + * Please note that file blocks are only exposed by some file systems, such as HDFS. File systems + * that do not expose block information will not create multiple file splits per file, but keep the + * files as one source split. + * + * Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2', + * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and + * suffixes. + * + * Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all + * files even through its parent directory is filtered out by the file filter. + */ +@Internal +public class BlockSplittingRecursiveAllDirEnumerator extends BlockSplittingRecursiveEnumerator { + +/** The filter used to skip hidden directories. */ +private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter(); + +/** + * Creates a new enumerator that enumerates all files whose file path matches the regex except + * hidden files. Hidden files are considered files where the filename starts with '.' or with + * '_'. + * + * The enumerator does not split files that have a suffix corresponding to a known + * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...). See {@link + * StandardDeCompressors} for details. + */ +public BlockSplittingRecursiveAllDirEnumerator(String pathPattern) { +this( +new RegexFileFilter(pathPattern), +StandardDeCompressors.getCommonSuffixes().toArray(new String[0])); +} + +/** + * Creates a new enumerator that uses the given predicate as a filter for file paths, and avoids + * splitting files with the given extension (typically to avoid splitting compressed files). + */ +public BlockSplittingRecursiveAllDirEnumerator( +final Predicate fileFilter, final String[] nonSplittableFileSuffixes) { +super(fileFilter, nonSplittableFileSuffixes); +} + +@Override +protected void addSplitsForPath( +FileStatus fileStatus, FileSystem fs, ArrayList target) +throws IOException { +if (fileStatus.isDir()) { +if (!hiddenDirFilter.test(fileStatus.getPath())) { +return; +} +final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath()); +for (FileStatus containedStatus : containedFiles) { +// If this dir matches the regex, add all files under it as a split (not include the Review Comment: plase remove
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support
gaborgsomogyi commented on code in PR #22694: URL: https://github.com/apache/flink/pull/22694#discussion_r1236809906 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/DelegationTokenIdentifier.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.security.token; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; + +/** Delegation token identifier for HiveServer2. */ +@Internal +public class DelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { Review Comment: All classes having `HiveServer2` prefix except this. Why? ## flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java: ## @@ -38,6 +47,7 @@ * implementing a reusable test utility around it, consequently had to resort to relying on mockito. */ class HadoopUserUtilsITCase { Review Comment: `HadoopUserUtils` != `HadoopUtils` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on pull request #21774: [FLINK-28227][connectors] Migrate o.a.f.streaming.examples to the new Source API
afedulov commented on PR #21774: URL: https://github.com/apache/flink/pull/21774#issuecomment-1600653013 @reswqa I just pushed the latest changes, please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32375) Flink AWS Source AssumeRole in VPC
[ https://issues.apache.org/jira/browse/FLINK-32375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735668#comment-17735668 ] Tomas Witzany commented on FLINK-32375: --- Thanks for the reply, ill look into backporting the fix. > Flink AWS Source AssumeRole in VPC > -- > > Key: FLINK-32375 > URL: https://issues.apache.org/jira/browse/FLINK-32375 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Affects Versions: 1.15.4 > Environment: Flink 1.15.4 > running on Amazon KDA (managed flink) > runtime is running inside a VPC > input stream cross-account >Reporter: Tomas Witzany >Priority: Major > > Current way to configure auth against AWS supports assuming a role, but when > you assume a role in a VPC without a NAT gateway, the global STS endpoint is > not accessible. And there is no way to configure the provider to use a > different endpoint. > This means that there currently is no supported way to configure AWS auth in > such a situation. Note that you can add an sts endpoint to a VPC, but its > always a regional endpoint, not the global endpoint. > Options on how you can configure this: > * configuring the aws DefaultsMode, by default legacy, to in-region: > ** environment variables - not possible in KDA > ** system variables - not possible in KDA > ** aws config file - not possible in KDA > * adding endpoint configuration options to the assume role provider > The piece of code that creates the provider and how it could be extended to > support endpoint configuration (just an example) > {code:java} > private static AwsCredentialsProvider getAssumeRoleCredentialProvider( > final Properties configProps, final String configPrefix) { > return StsAssumeRoleCredentialsProvider.builder() > .refreshRequest( > AssumeRoleRequest.builder() > .roleArn( > configProps.getProperty( > > AWSConfigConstants.roleArn(configPrefix))) > .roleSessionName( > configProps.getProperty( > > AWSConfigConstants.roleSessionName(configPrefix))) > .externalId( > configProps.getProperty( > > AWSConfigConstants.externalId(configPrefix))) > .build()) > .stsClient( > StsClient.builder() > .credentialsProvider( > getCredentialsProvider( > configProps, > > AWSConfigConstants.roleCredentialsProvider( > configPrefix))) > .endpointOverride(new URI( // added code > > configProps.getProperty(AWSConfigConstants.endpointOverride(configPrefix)) // > added code > )) // added code > .region(getRegion(configProps)) > .build()) > .build(); > } {code} > > I am not entirely certain that there is no other way to configure this in my > situation, my current plan is to build my own version of the connectors with > this option supported. If a feature like this would be nice to have, I would > be happy to share my results in a PR afterwards. > However, if there is a better way to configure this, I would be happy to > hear about it. If you know of some trick to do this in KDA, where you have > limited options to configure things. > Some other options to attack this problem: > * trying to set system properties on the task manager before the kinesis > source is initialized - this is hard as you dont have control over execution > order, probably doable though with some hacks > * ask AWS support to set a system property with flink config file options - > this is hard as it will involve aws support > * add a NAT gateway to the VPC - this will not be always an option because > of security reasons -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage
xintongsong commented on code in PR #22804: URL: https://github.com/apache/flink/pull/22804#discussion_r1236252253 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileWriter.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * The {@link PartitionFileWriter} interface defines the write logic for different types of shuffle + * files. + */ +public interface PartitionFileWriter { + +/** + * Write the {@link SpilledBufferContext}s to the partition file. The written buffers may belong + * to multiple subpartitions. + * + * @return the completable future indicating whether the writing file process has finished. If + * the {@link CompletableFuture} is completed, the written process is completed. + */ +CompletableFuture write(List spilledBuffers); Review Comment: Should explicitly mention that `spilledBuffers` should be consecutive in the file. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SpilledBufferContext.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The buffer context to be flushed, including the {@link Buffer}, the buffer index, the + * subpartition id, the segment id, etc. + */ +public class SpilledBufferContext { + +/** The data buffer. Note that the buffer should not be null. */ +private final Buffer buffer; + +/** The index of buffer. */ +private final int bufferIndex; + +/** The id of subpartition. */ +private final int subpartitionId; + +/** The id of segment. */ +private final int segmentId; Review Comment: Why do we need these when we already have `SubpartitionSpilledBufferContext` and `SegmentSpilledBufferContext`? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +
[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records
[ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-31901: - Fix Version/s: ml-2.4.0 (was: ml-2.3.0) > AbstractBroadcastWrapperOperator should not block checkpoint barriers when > processing cached records > > > Key: FLINK-31901 > URL: https://issues.apache.org/jira/browse/FLINK-31901 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.4.0 > > > Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast > input until the broadcast inputs are all processed. After the broadcast > variables are ready, we first process the cached records and then continue to > process the newly arrived records. > > Processing cached elements is invoked via `Input#processElement` and > `Input#processWatermark`. However, processing cached element may take a long > time since there may be many cached records, which could potentially block > the checkpoint barrier. > > If we run the code snippet here[1], we are supposed to get logs as follows. > {code:java} > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at > time: 1682319149462 > processed cached records, cnt: 1 at time: 1682319149569 > processed cached records, cnt: 2 at time: 1682319149614 > processed cached records, cnt: 3 at time: 1682319149655 > processed cached records, cnt: 4 at time: 1682319149702 > processed cached records, cnt: 5 at time: 1682319149746 > processed cached records, cnt: 6 at time: 1682319149781 > processed cached records, cnt: 7 at time: 1682319149891 > processed cached records, cnt: 8 at time: 1682319150011 > processed cached records, cnt: 9 at time: 1682319150116 > processed cached records, cnt: 10 at time: 1682319150199 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at > time: 1682319150378 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at > time: 1682319150606 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at > time: 1682319150704 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at > time: 1682319150785 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at > time: 1682319150859 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at > time: 1682319150935 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at > time: 1682319151007{code} > > We can find that from line#2 to line#11, there is no checkpoints and the > barriers are blocked until all cached elements are processed, which takes > ~600ms and much longer than checkpoint interval (i.e., 100ms) > > [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32335) Fix the Flink ML unittest failure
[ https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-32335: - Fix Version/s: ml-2.4.0 (was: ml-2.3.0) > Fix the Flink ML unittest failure > - > > Key: FLINK-32335 > URL: https://issues.apache.org/jira/browse/FLINK-32335 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Fix For: ml-2.4.0 > > > The [github > CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620) > of Flink ML failed because of the following exception. > > {code:java} > E Caused by: java.util.ConcurrentModificationException > 223E at > java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648) > 224E at > java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044) > 225E at > org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464) > 226E at > org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392) > 227E at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) > 228E at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) > 229E at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) > 230E at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > 231E at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > 232E at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > 233E at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > 234E at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > 235E at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > 236E at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > 237E at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > 238E at java.lang.Thread.run(Thread.java:750){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31948) Supports triggering CI manually
[ https://issues.apache.org/jira/browse/FLINK-31948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-31948: - Fix Version/s: ml-2.4.0 (was: ml-2.3.0) > Supports triggering CI manually > --- > > Key: FLINK-31948 > URL: https://issues.apache.org/jira/browse/FLINK-31948 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.4.0 > > > Supports triggering CI manually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #614: [FLINK-32057] Support 1.18 rescale api for applying parallelism overrides
gyfora commented on PR #614: URL: https://github.com/apache/flink-kubernetes-operator/pull/614#issuecomment-1600576306 Addressed your comments @mxm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jiaoqingbo commented on pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support
jiaoqingbo commented on PR #22694: URL: https://github.com/apache/flink/pull/22694#issuecomment-1600558583 @pvary could you help take a look when you have time? Thanks. I added some UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30342) Migrate ZooKeeperLeaderElectionTest
[ https://issues.apache.org/jira/browse/FLINK-30342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-30342: -- Parent: FLINK-26522 Issue Type: Sub-task (was: Technical Debt) > Migrate ZooKeeperLeaderElectionTest > --- > > Key: FLINK-30342 > URL: https://issues.apache.org/jira/browse/FLINK-30342 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > ||ZooKeeperLeaderElectionTest||ZooKeeperMultipleComponentLeaderElectionDriverTest|| > |testZooKeeperLeaderElectionRetrieval|testPublishLeaderInformation| > |testZooKeeperReelection|testLeaderElectionWithMultipleDrivers| > |testZooKeeperReelectionWithReplacement|| > |testLeaderShouldBeCorrectedWhenOverwritten|testNonLeaderCannotPublishLeaderInformation > (slightly different)| > |testExceptionForwarding|| > |testEphemeralZooKeeperNodes|| > |testNotLeaderShouldNotCleanUpTheLeaderInformation| (but similar to > testNonLeaderCannotPublishLeaderInformation)| > |testUnExpectedErrorForwarding|| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30342) Migrate ZooKeeperLeaderElectionTest
[ https://issues.apache.org/jira/browse/FLINK-30342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-30342. --- Fix Version/s: 1.18.0 Resolution: Fixed master: c4c633e843920bbf18b2ac2242ee48ada408f6ac > Migrate ZooKeeperLeaderElectionTest > --- > > Key: FLINK-30342 > URL: https://issues.apache.org/jira/browse/FLINK-30342 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > ||ZooKeeperLeaderElectionTest||ZooKeeperMultipleComponentLeaderElectionDriverTest|| > |testZooKeeperLeaderElectionRetrieval|testPublishLeaderInformation| > |testZooKeeperReelection|testLeaderElectionWithMultipleDrivers| > |testZooKeeperReelectionWithReplacement|| > |testLeaderShouldBeCorrectedWhenOverwritten|testNonLeaderCannotPublishLeaderInformation > (slightly different)| > |testExceptionForwarding|| > |testEphemeralZooKeeperNodes|| > |testNotLeaderShouldNotCleanUpTheLeaderInformation| (but similar to > testNonLeaderCannotPublishLeaderInformation)| > |testUnExpectedErrorForwarding|| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30342) Migrate ZooKeeperLeaderElectionTest
[ https://issues.apache.org/jira/browse/FLINK-30342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-30342: -- Parent: (was: FLINK-30338) Issue Type: Technical Debt (was: Sub-task) > Migrate ZooKeeperLeaderElectionTest > --- > > Key: FLINK-30342 > URL: https://issues.apache.org/jira/browse/FLINK-30342 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > ||ZooKeeperLeaderElectionTest||ZooKeeperMultipleComponentLeaderElectionDriverTest|| > |testZooKeeperLeaderElectionRetrieval|testPublishLeaderInformation| > |testZooKeeperReelection|testLeaderElectionWithMultipleDrivers| > |testZooKeeperReelectionWithReplacement|| > |testLeaderShouldBeCorrectedWhenOverwritten|testNonLeaderCannotPublishLeaderInformation > (slightly different)| > |testExceptionForwarding|| > |testEphemeralZooKeeperNodes|| > |testNotLeaderShouldNotCleanUpTheLeaderInformation| (but similar to > testNonLeaderCannotPublishLeaderInformation)| > |testUnExpectedErrorForwarding|| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp merged pull request #22829: [FLINK-30342][test] Migrates ZooKeeperLeaderElectionTest to use the MultipleComponentLeaderElectionDriver interface
XComp merged PR #22829: URL: https://github.com/apache/flink/pull/22829 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] CasperTDK commented on pull request #22393: FLINK-28171 [flink-kubernates] enable add appProtocol via the configuration and verify it is not overridden by Default port defintion
CasperTDK commented on PR #22393: URL: https://github.com/apache/flink/pull/22393#issuecomment-1600537399 Allowing myself to give this a bump. Any way to prioritize the review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on pull request #21774: [FLINK-28227][connectors] Migrate o.a.f.streaming.examples to the new Source API
afedulov commented on PR #21774: URL: https://github.com/apache/flink/pull/21774#issuecomment-1600536032 Hi @reswqa I actually just finished reimplementing the `SocketSource(Function)` as the new Source and - this unblocks https://github.com/apache/flink/pull/20049 . I just need to go though some internal OSS contributions formality and will most likely complement this PR with last missing bit early next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable
[ https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735631#comment-17735631 ] Fang Yong commented on FLINK-32370: --- [~mapohl] I have created a new PR https://github.com/apache/flink/pull/22838 for this. You can cherry-pick it as you need, and please feel free to ping me when there's any issue, thanks > JDBC SQl gateway e2e test is unstable > - > > Key: FLINK-32370 > URL: https://issues.apache.org/jira/browse/FLINK-32370 > Project: Flink > Issue Type: Technical Debt > Components: Tests >Affects Versions: 1.18.0 >Reporter: Chesnay Schepler >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, > flink-vsts-standalonesession-0-fv-az75-650.log, > flink-vsts-taskexecutor-0-fv-az75-650.log > > > The client is failing while trying to collect data when the job already > finished on the cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32407) Notify catalog listener for table events
Fang Yong created FLINK-32407: - Summary: Notify catalog listener for table events Key: FLINK-32407 URL: https://issues.apache.org/jira/browse/FLINK-32407 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Fang Yong -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32406) Notify catalog listener for database events
Fang Yong created FLINK-32406: - Summary: Notify catalog listener for database events Key: FLINK-32406 URL: https://issues.apache.org/jira/browse/FLINK-32406 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Fang Yong -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32405) Initialize catalog listener for CatalogManager
Fang Yong created FLINK-32405: - Summary: Initialize catalog listener for CatalogManager Key: FLINK-32405 URL: https://issues.apache.org/jira/browse/FLINK-32405 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Fang Yong -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32404) Introduce catalog modification listener and factory interfaces
Fang Yong created FLINK-32404: - Summary: Introduce catalog modification listener and factory interfaces Key: FLINK-32404 URL: https://issues.apache.org/jira/browse/FLINK-32404 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Fang Yong -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32403) Add database related operations in catalog manager
Fang Yong created FLINK-32403: - Summary: Add database related operations in catalog manager Key: FLINK-32403 URL: https://issues.apache.org/jira/browse/FLINK-32403 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Fang Yong Add database operations in catalog manager for different sql operations -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on pull request #22835: [hotfix] Remove local timeout for Hybrid and Tiered result partition test.
reswqa commented on PR #22835: URL: https://github.com/apache/flink/pull/22835#issuecomment-1600495258 Thanks @TanYuxin-tyx for the quick review, I have updated and force pushed this as it is only minor fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] swuferhong commented on pull request #22805: [FLINK-32365][orc]get orc table statistics in parallel
swuferhong commented on PR #22805: URL: https://github.com/apache/flink/pull/22805#issuecomment-1600493415 > @luoyuxia I find the code is called in multiple places. We make it configurable, we need change more moudles and we get more parameters. if we set parameter in hadoop config,both orc and parquet can use this parameter. Could you give me some idea? Hi, did you encounter the problem of slow reporting ORC statistics during using hive connector? If that, I think you can add this parameter into `HiveOptions` as a Flink conf, and you need to set this flink conf into job conf in method `HiveSourceBuilder.setFlinkConfigurationToJobConf()` (jobConf will be add into hadoopConf in hive source) . By doing this, you can get this parameter from `hadoopConf`, if this parameter not in `hadoopConf,` you can set it as `Runtime.getRuntime().availableProcessors()` as default. WDYT, @luoyuxia . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22835: [hotfix] Remove local timeout for Hybrid and Tiered result partition test.
reswqa commented on code in PR #22835: URL: https://github.com/apache/flink/pull/22835#discussion_r1236686281 ## flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.slf4j.Logger; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Rejected executions are ignored or logged in debug if the executor is {@link + * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, {@link + * RejectedExecutionException} is thrown. + */ +public class IgnoreShutdownRejectedExecutionHandler implements RejectedExecutionHandler { +private final Logger logger; + +public IgnoreShutdownRejectedExecutionHandler() { +this(null); +} + +public IgnoreShutdownRejectedExecutionHandler(Logger logger) { +this.logger = logger; +} + +@Override +public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { +if (executor.isShutdown()) { +if (logger != null) { +logger.debug("Execution rejected because shutdown is in progress"); Review Comment: Done. ## flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.slf4j.Logger; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Rejected executions are ignored or logged in debug if the executor is {@link + * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, {@link + * RejectedExecutionException} is thrown. + */ +public class IgnoreShutdownRejectedExecutionHandler implements RejectedExecutionHandler { +private final Logger logger; Review Comment: Marked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32402) FLIP-294: Support Customized Catalog Modification Listener
Fang Yong created FLINK-32402: - Summary: FLIP-294: Support Customized Catalog Modification Listener Key: FLINK-32402 URL: https://issues.apache.org/jira/browse/FLINK-32402 Project: Flink Issue Type: Improvement Components: Table SQL / Ecosystem Affects Versions: 1.18.0 Reporter: Fang Yong Issue for https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22838: [FLINK-32370][jdbc-driver] Debug error log for jdbc gateway e2e test
flinkbot commented on PR #22838: URL: https://github.com/apache/flink/pull/22838#issuecomment-1600491747 ## CI report: * bf99f60859b1a36224aaf0e5a539b14e1c9421fb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem
LadyForest commented on code in PR #22818: URL: https://github.com/apache/flink/pull/22818#discussion_r1236679027 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -203,54 +193,75 @@ public void close() throws IOException { } } -private void checkJarResources(List resourceUris) throws IOException { -// only support register jar resource -if (resourceUris.stream() -.anyMatch(resourceUri -> !ResourceType.JAR.equals(resourceUri.getResourceType( { -throw new ValidationException( -String.format( -"Only support to register jar resource, resource info:\n %s.", -resourceUris.stream() -.map(ResourceUri::getUri) -.collect(Collectors.joining(",\n"; -} +/** Check whether the {@link Path} exists. */ +public boolean exists(Path filePath) throws IOException { +return filePath.getFileSystem().exists(filePath); +} -for (ResourceUri resourceUri : resourceUris) { -checkJarPath(new Path(resourceUri.getUri())); +/** + * Synchronize a file resource identified by the given {@link ResourceUri} with a local copy Review Comment: > The path passed to resourceGenerator will be a local path received from the given {@link ResourceUri}. I think this description is not so accurate because if the given resource URI is `hdfs://foo/bar/plan.json`, then the local path passes to the resource generator should be `file://root-dir-forresource-manager/plan-uuid.json` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs opened a new pull request, #22838: [FLINK-32370][jdbc-driver] Debug error log for jdbc gateway e2e test
FangYongs opened a new pull request, #22838: URL: https://github.com/apache/flink/pull/22838 ## What is the purpose of the change This PR aims to debug error log for jdbc gateway e2e test ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem
LadyForest commented on code in PR #22818: URL: https://github.com/apache/flink/pull/22818#discussion_r1236671845 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -296,4 +319,85 @@ private Path getResourceLocalPath(Path remotePath) { } return new Path(localResourceDir, fileNameWithUUID); } + +private void checkResources(List resourceUris, ResourceType expectedType) +throws IOException { +// check the resource type +if (resourceUris.stream() +.anyMatch(resourceUri -> expectedType != resourceUri.getResourceType())) { +throw new ValidationException( +String.format( +"Only support to register %s resource, resource info:\n %s.", Review Comment: This method is refactored to adapt both the `JAR` and `FILE` types. I agree with you that the message should be adapted as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem
LadyForest commented on code in PR #22818: URL: https://github.com/apache/flink/pull/22818#discussion_r1236617192 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -89,52 +90,41 @@ public ResourceManager(ReadableConfig config, MutableURLClassLoader userClassLoa * register them into the {@link ResourceManager}. */ public void registerJarResources(List resourceUris) throws IOException { -// check jar resource before register -checkJarResources(resourceUris); - -Map stagingResourceLocalURLs = new HashMap<>(); -for (ResourceUri resourceUri : resourceUris) { -// check whether the resource has been registered -if (resourceInfos.containsKey(resourceUri) && resourceInfos.get(resourceUri) != null) { -LOG.info( -"Resource [{}] has been registered, overwriting of registered resource is not supported " -+ "in the current version, skipping.", -resourceUri.getUri()); -continue; -} - -// here can check whether the resource path is valid -Path path = new Path(resourceUri.getUri()); -URL localUrl; -// check resource scheme -String scheme = StringUtils.lowerCase(path.toUri().getScheme()); -// download resource to local path firstly if in remote -if (scheme != null && !FILE_SCHEME.equals(scheme)) { -localUrl = downloadResource(path); -} else { -localUrl = getURLFromPath(path); -// if the local jar resource is a relative path, here convert it to absolute path -// before register -resourceUri = new ResourceUri(ResourceType.JAR, localUrl.getPath()); -} - -// check the local jar file -JarUtils.checkJarFile(localUrl); - -// add it to staging map -stagingResourceLocalURLs.put(resourceUri, localUrl); -} - -// register resource in batch -stagingResourceLocalURLs.forEach( -(resourceUri, url) -> { -// jar resource need add to classloader -userClassLoader.addURL(url); -LOG.info("Added jar resource [{}] to class path.", url); +registerResources( +prepareStagingResources( +resourceUris, +ResourceType.JAR, +true, +url -> { +try { +JarUtils.checkJarFile(url); +} catch (IOException e) { +throw new ValidationException(e.getMessage(), e); Review Comment: We're not registering a jar, actually.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem
LadyForest commented on code in PR #22818: URL: https://github.com/apache/flink/pull/22818#discussion_r1236665067 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -731,38 +731,46 @@ public TableResultInternal executePlan(InternalPlan plan) { } private CompiledPlan compilePlanAndWrite( -String filePath, boolean ifNotExists, Operation operation) { -File file = Paths.get(filePath).toFile(); -if (file.exists()) { -if (ifNotExists) { -return loadPlan(PlanReference.fromFile(filePath)); +String pathString, boolean ignoreIfExists, Operation operation) { +try { +ResourceUri planResource = new ResourceUri(ResourceType.FILE, pathString); +Path planPath = new Path(pathString); +if (resourceManager.exists(planPath)) { +if (ignoreIfExists) { +return loadPlan( +PlanReference.fromFile( + resourceManager.registerFileResource(planResource))); +} + +if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) { +throw new TableException( +String.format( +"Cannot overwrite the plan file '%s'. " ++ "Either manually remove the file or, " ++ "if you're debugging your job, " ++ "set the option '%s' to true.", +pathString, TableConfigOptions.PLAN_FORCE_RECOMPILE.key())); +} } -if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) { +CompiledPlan compiledPlan; +if (operation instanceof StatementSetOperation) { +compiledPlan = compilePlan(((StatementSetOperation) operation).getOperations()); +} else if (operation instanceof ModifyOperation) { +compiledPlan = compilePlan(Collections.singletonList((ModifyOperation) operation)); +} else { throw new TableException( -String.format( -"Cannot overwrite the plan file '%s'. " -+ "Either manually remove the file or, " -+ "if you're debugging your job, " -+ "set the option '%s' to true.", -filePath, TableConfigOptions.PLAN_FORCE_RECOMPILE.key())); +"Unsupported operation to compile: " ++ operation.getClass() ++ ". This is a bug, please file an issue."); } -} - -CompiledPlan compiledPlan; -if (operation instanceof StatementSetOperation) { -compiledPlan = compilePlan(((StatementSetOperation) operation).getOperations()); -} else if (operation instanceof ModifyOperation) { -compiledPlan = compilePlan(Collections.singletonList((ModifyOperation) operation)); -} else { +resourceManager.syncFileResource( +planResource, path -> compiledPlan.writeToFile(path, false)); +return compiledPlan; +} catch (IOException e) { throw new TableException( -"Unsupported operation to compile: " -+ operation.getClass() -+ ". This is a bug, please file an issue."); +String.format("Cannot execute operation %s ", operation.asSummaryString()), e); Review Comment: Here's my understanding. The entire code block is surrounded by one `try... catch`. And the `IOException` might be thrown at any point - L#738(check file status) - L#742(download the remote file to local) - L#767(sync the local file to remote). As a result, I think `Fail to compile plan and write to xxx for xx` might be too general to fit all conditions. At the same time, this method is called by both `CompilePlanOperation` and `CompileAndExecutePlanOperation`. So I think the top-level exception message should differentiate it's a `COMPILE PLAN` statement or a `COMPILE AND EXECUTE PLAN` statement. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable
[ https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735615#comment-17735615 ] Fang Yong commented on FLINK-32370: --- Thanks [~mapohl], I think I miss the error log in JobManager. I will fix it > JDBC SQl gateway e2e test is unstable > - > > Key: FLINK-32370 > URL: https://issues.apache.org/jira/browse/FLINK-32370 > Project: Flink > Issue Type: Technical Debt > Components: Tests >Affects Versions: 1.18.0 >Reporter: Chesnay Schepler >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, > flink-vsts-standalonesession-0-fv-az75-650.log, > flink-vsts-taskexecutor-0-fv-az75-650.log > > > The client is failing while trying to collect data when the job already > finished on the cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22835: [hotfix] Remove local timeout for Hybrid and Tiered result partition test.
TanYuxin-tyx commented on code in PR #22835: URL: https://github.com/apache/flink/pull/22835#discussion_r1236654517 ## flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.slf4j.Logger; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Rejected executions are ignored or logged in debug if the executor is {@link + * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, {@link + * RejectedExecutionException} is thrown. + */ +public class IgnoreShutdownRejectedExecutionHandler implements RejectedExecutionHandler { +private final Logger logger; + +public IgnoreShutdownRejectedExecutionHandler() { +this(null); +} + +public IgnoreShutdownRejectedExecutionHandler(Logger logger) { +this.logger = logger; +} + +@Override +public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { +if (executor.isShutdown()) { +if (logger != null) { +logger.debug("Execution rejected because shutdown is in progress"); Review Comment: "Execution is rejected" may be more appropriate. ## flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.slf4j.Logger; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Rejected executions are ignored or logged in debug if the executor is {@link + * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, {@link + * RejectedExecutionException} is thrown. + */ +public class IgnoreShutdownRejectedExecutionHandler implements RejectedExecutionHandler { +private final Logger logger; Review Comment: We'd better mark this logger as Nullable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem
LadyForest commented on code in PR #22818: URL: https://github.com/apache/flink/pull/22818#discussion_r1236617192 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -89,52 +90,41 @@ public ResourceManager(ReadableConfig config, MutableURLClassLoader userClassLoa * register them into the {@link ResourceManager}. */ public void registerJarResources(List resourceUris) throws IOException { -// check jar resource before register -checkJarResources(resourceUris); - -Map stagingResourceLocalURLs = new HashMap<>(); -for (ResourceUri resourceUri : resourceUris) { -// check whether the resource has been registered -if (resourceInfos.containsKey(resourceUri) && resourceInfos.get(resourceUri) != null) { -LOG.info( -"Resource [{}] has been registered, overwriting of registered resource is not supported " -+ "in the current version, skipping.", -resourceUri.getUri()); -continue; -} - -// here can check whether the resource path is valid -Path path = new Path(resourceUri.getUri()); -URL localUrl; -// check resource scheme -String scheme = StringUtils.lowerCase(path.toUri().getScheme()); -// download resource to local path firstly if in remote -if (scheme != null && !FILE_SCHEME.equals(scheme)) { -localUrl = downloadResource(path); -} else { -localUrl = getURLFromPath(path); -// if the local jar resource is a relative path, here convert it to absolute path -// before register -resourceUri = new ResourceUri(ResourceType.JAR, localUrl.getPath()); -} - -// check the local jar file -JarUtils.checkJarFile(localUrl); - -// add it to staging map -stagingResourceLocalURLs.put(resourceUri, localUrl); -} - -// register resource in batch -stagingResourceLocalURLs.forEach( -(resourceUri, url) -> { -// jar resource need add to classloader -userClassLoader.addURL(url); -LOG.info("Added jar resource [{}] to class path.", url); +registerResources( +prepareStagingResources( +resourceUris, +ResourceType.JAR, +true, +url -> { +try { +JarUtils.checkJarFile(url); +} catch (IOException e) { +throw new ValidationException(e.getMessage(), e); Review Comment: We're not registering a jar, actually.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22837: [FLINK-31957][docs][table] Add user story for configuring operator-level state TTL via compiled plan
flinkbot commented on PR #22837: URL: https://github.com/apache/flink/pull/22837#issuecomment-1600417210 ## CI report: * a861c95ba2fe65798002daaf9072e48df7b7dfff UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31957) Add documentation for the user story
[ https://issues.apache.org/jira/browse/FLINK-31957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31957: --- Labels: pull-request-available (was: ) > Add documentation for the user story > > > Key: FLINK-31957 > URL: https://issues.apache.org/jira/browse/FLINK-31957 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Add documentation on how to use compiled plan to configure operator-level > state TTL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest opened a new pull request, #22837: [FLINK-31957][docs][table] Add user story for configuring operator-level state TTL via compiled plan
LadyForest opened a new pull request, #22837: URL: https://github.com/apache/flink/pull/22837 ## What is the purpose of the change Add user story for FLIP-292 ## Brief change log Add description on how to configure operator-level state TTL to `overview.md` ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No - The S3 file system connector: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? Docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] klion26 opened a new pull request, #658: [hotfix] Fix the wrong conflic resolve
klion26 opened a new pull request, #658: URL: https://github.com/apache/flink-web/pull/658 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #244: Fix some bugs when preparing the release of 2.3.0
lindong28 commented on code in PR #244: URL: https://github.com/apache/flink-ml/pull/244#discussion_r1236512750 ## tools/releasing/deploy_staging_jars.sh: ## @@ -41,7 +41,11 @@ fi cd ${PROJECT_ROOT} -echo "Deploying to repository.apache.org" -${MVN} clean deploy -Papache-release -DskipTests -DretryFailedDeploymentCount=10 $CUSTOM_OPTIONS +FLINK_VERSIONS=("1.15" "1.16" "1.17") Review Comment: We probably don't want to modify this script every time we prepare a new Flink ML release. Would it be simpler to use environment variables to pass versions and document the usages of this environment variables in the release wiki? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 opened a new pull request, #244: Fix some bugs when preparing the release of 2.3.0
jiangxin369 opened a new pull request, #244: URL: https://github.com/apache/flink-ml/pull/244 ## What is the purpose of the change Fix some bugs when preparing the release of 2.3.0. ## Brief change log - Replace the version with perl to avoid different behaviors of sed among different OS - Deploy artifacts of each Flink version ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32401) KafkaSourceBuilder reset the 'auto.offset.reset'
[ https://issues.apache.org/jira/browse/FLINK-32401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] KianChen closed FLINK-32401. Resolution: Fixed > KafkaSourceBuilder reset the 'auto.offset.reset' > > > Key: FLINK-32401 > URL: https://issues.apache.org/jira/browse/FLINK-32401 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0 >Reporter: KianChen >Priority: Major > Fix For: 1.18.0, 1.17.0, 1.16.0, 1.15.0, 1.14.0 > > > KafkaSourceBuilde#parseAndSetRequiredProperties reset the 'auto.offset.reset' > to "earliest" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on pull request #22620: [FLINK-31413][hive] Change scope of flink-table-planner dependency from provided to test in Hive connector
luoyuxia commented on PR #22620: URL: https://github.com/apache/flink/pull/22620#issuecomment-1600289821 @LadyForest Thanks for reviewing. I have addressed your comments in the last three commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org