[jira] [Closed] (FLINK-27411) Move lookup table source cache logic to flink-table-runtime module

2023-06-21 Thread Alexander Smirnov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Smirnov closed FLINK-27411.
-
Resolution: Workaround

> Move lookup table source cache logic to flink-table-runtime module
> --
>
> Key: FLINK-27411
> URL: https://issues.apache.org/jira/browse/FLINK-27411
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase, Connectors / JDBC, Table SQL / API, 
> Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Alexander Smirnov
>Priority: Major
> Attachments: LookupJoin(2).png
>
>
> The idea was inspired by FLIP 
> [https://cwiki.apache.org/confluence/display/FLINK/FLI..|https://vk.com/away.php?to=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-221%2BAbstraction%2Bfor%2Blookup%2Bsource%2Bcache%2Band%2Bmetric&cc_key=].
>  [~renqs] and Yuan Zhu have done great work on this. But I suggest to 
> implement it in a slightly different way, that will allow applying 
> optimizations to caching + requires less dependencies to connectors.
> The point is to move logic of caching to a lower level - to the level of 
> flink-table-runtime operators. Architecture of lookup join looks like this 
> (red text - schematic representation of proposed changes):
> !LookupJoin(2).png|width=589,height=264!
> LookupConfig is named like this (not CacheConfig) because it can also 
> contains non-cache options for lookup join (for example, 
> 'lookup.max-retries', 'lookup.async'...).
> Changes in connectors - remove their own logic for configs, caching, retrying 
> queries.
> Changes is public "Table SQL / API" - new class LookupConfig, new 
> ConfigOptions for lookup connectors and new method 'getLookupConfig' in 
> LookupTableSource.
> {code:java}
> @PublicEvolving
> public interface LookupTableSource extends DynamicTableSource {
>  ...
> /** @return configurations for planning lookup join and executing it in 
> runtime. */
> default LookupConfig getLookupConfig() {
> return null;
> } 
>...
> }{code}
> Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his 
> inheritors.
> Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner, 
> LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner, 
> AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern 
> here to avoid code duplication and a large number of classes, but in our 
> private version design is like this (maybe not so elegant).
> With such architecture we can apply further optimizations to caching: 
> 1) Caching after calculations. LookupJoinRunnerWithCalc + 
> AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc + 
> AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function 
> contains calculations on fields of lookup table, and most of the time these 
> calculations are filters and projections. 
> For example, if we have join table A with lookup table B using condition 
> ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’ 
> function will contain filters 'A.age = B.age + 10 and B.salary > 1000'.  
> If we apply this function before storing records in cache, size of cache will 
> be significantly reduced: filters = avoid storing useless records in cache, 
> projections = reduce records’ size. So the initial max number of records in 
> cache can be increased by the user.
> 2) Constant keys optimization. If join condition contains constants, for 
> example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store 
> '10' in cache. Currently TableFunction's 'eval' method is called with values 
> 'A.name' and 10, so we store '10' in cache for every row, which is pretty 
> useless.
> Notice, that in this change I didn't mention Hive lookup connector, because 
> it stores all data in memory. This logic can be replaced in future by 'ALL' 
> cache strategy, that was mentioned in original FLIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22808: [FLINK-32316] [SourceAlignment] Scheduling announceCombinedWatermark period task during SourceCoordinator::start not in construc

2023-06-21 Thread via GitHub


1996fanrui commented on code in PR #22808:
URL: https://github.com/apache/flink/pull/22808#discussion_r1237971048


##
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##
@@ -131,6 +136,73 @@ void testWatermarkAlignmentWithTwoGroups() throws 
Exception {
 }
 }
 
+/**
+ * When JobManager failover and auto recover job, SourceCoordinator will 
reset twice: 1. Create
+ * JobMaster --> Create Scheduler --> Create DefaultExecutionGraph --> Init
+ * SourceCoordinator(but will not start it) 2. JobMaster call
+ * restoreLatestCheckpointedStateInternal, which will call {@link
+ * 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator#resetToCheckpoint(long,byte[])}
+ * and reset SourceCoordinator. Because the first SourceCoordinator is not 
be started, so the
+ * period task can't be stopped.
+ */
+@Test
+void testSourceCoordinatorReset() throws Exception {

Review Comment:
   In general, this test is checking that the `announceCombinedWatermark` 
shouldn't be called without `SourceCoordinator#start`, the reset is just a case 
of this test(Currently there is only one case.).  right?
   
   If so, I think the method name isn't general. How about 
`testAnnounceCombinedWatermarkWithoutStart`?



##
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##
@@ -131,6 +136,73 @@ void testWatermarkAlignmentWithTwoGroups() throws 
Exception {
 }
 }
 
+/**
+ * When JobManager failover and auto recover job, SourceCoordinator will 
reset twice: 1. Create
+ * JobMaster --> Create Scheduler --> Create DefaultExecutionGraph --> Init
+ * SourceCoordinator(but will not start it) 2. JobMaster call
+ * restoreLatestCheckpointedStateInternal, which will call {@link
+ * 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator#resetToCheckpoint(long,byte[])}
+ * and reset SourceCoordinator. Because the first SourceCoordinator is not 
be started, so the
+ * period task can't be stopped.
+ */
+@Test
+void testSourceCoordinatorReset() throws Exception {
+long maxDrift = 1000L;
+WatermarkAlignmentParams params =
+new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
+
+final Source> 
mockSource =
+createMockSource();
+
+// First to init a SourceCoordinator to simulate JobMaster init 
SourceCoordinator
+AtomicInteger counter1 = new AtomicInteger(0);
+sourceCoordinator =
+new SourceCoordinator>(
+OPERATOR_NAME,
+mockSource,
+getNewSourceCoordinatorContext(),
+new CoordinatorStoreImpl(),
+params,
+null) {
+@Override
+void announceCombinedWatermark() {
+counter1.incrementAndGet();
+}
+};
+
+// Second we call SourceCoordinator::close and re-init 
SourceCoordinator to simulate
+// RecreateOnResetOperatorCoordinator::resetToCheckpoint
+sourceCoordinator.close();
+AtomicInteger counter2 = new AtomicInteger(0);
+sourceCoordinator =
+new SourceCoordinator>(
+OPERATOR_NAME,
+mockSource,
+getNewSourceCoordinatorContext(),
+new CoordinatorStoreImpl(),
+params,
+null) {
+@Override
+void announceCombinedWatermark() {
+counter2.incrementAndGet();
+}
+};
+
+final int subtask = 0;
+int attemptNumber = 0;
+sourceCoordinator.start();
+setReaderTaskReady(sourceCoordinator, subtask, attemptNumber);
+
+sourceCoordinator.handleEventFromOperator(
+subtask, attemptNumber, new ReportedWatermarkEvent(1000));

Review Comment:
   As I understand, these lines are not necessary, right?
   
   ```suggestion
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##
@@ -131,6 +136,73 @@ void testWatermarkAlignmentWithTwoGroups() throws 
Exception {
 }
 }
 
+/**
+ * When JobManager failover and auto recover job, SourceCoordinator will 
reset twice: 1. Create
+ * JobMaster --> Create Scheduler --> Create DefaultExecutionGraph --> Init
+ * SourceCoordinator(but will not start it) 2. JobMaster call
+ * restoreLatestCheckpointedStateInternal, which will call {@link
+ * 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator#resetToCheckp

[GitHub] [flink] flinkbot commented on pull request #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks

2023-06-21 Thread via GitHub


flinkbot commented on PR #22845:
URL: https://github.com/apache/flink/pull/22845#issuecomment-1601970488

   
   ## CI report:
   
   * 3e53f0587d57073a6db12b1381aaa9ed4fba644d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks

2023-06-21 Thread via GitHub


1996fanrui commented on PR #22845:
URL: https://github.com/apache/flink/pull/22845#issuecomment-1601969939

   Hi @becketqin @pnowojski  @LoveHeat , would you mind help take a look this 
PR in your free time? thanks~
   
   I have described this bug in detail in FLINK-32411.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32411:
---
Labels: pull-request-available  (was: )

> SourceCoordinator thread leaks when job recovers from checkpoint
> 
>
> Key: FLINK-32411
> URL: https://issues.apache.org/jira/browse/FLINK-32411
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-06-22-11-12-35-747.png
>
>
> SourceCoordinator thread leaks when job recovers from checkpoint, from the 
> following figure, we can see:
>  * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
>  * 2 SourceCoordinator thread for slow FastNumberSequenceSource 
> !image-2023-06-22-11-12-35-747.png|width=889,height=225!
> h1. Root cause:
>  # When initialize the ExecutionJobVertex of source, 
> RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
> link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
>  # When job recovers from checkpoint,  
> [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
>  will close the old coordinator, and create a new coordinator. 
>  # The 
> [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
>  just close the SourceCoordinatorContext after coordinator is started, so the 
> SourceCoordinatorContext of old coordinator won't be closed.
>  # The SourceCoordinatorContext create some threads in its 
> [constructor|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L118],
>  so it should be closed even if the SourceCoordinator isn't started.
>  
> The call stack about creating SourceCoordinator:
> {code:java}
> // Create the first SourceCoordinator
> "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
>       - locked <0x1f02> (a 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultE

[GitHub] [flink] 1996fanrui opened a new pull request, #22845: [FLINK-32411][connector/common] Fix the bug about SourceCoordinator thread leaks

2023-06-21 Thread via GitHub


1996fanrui opened a new pull request, #22845:
URL: https://github.com/apache/flink/pull/22845

   ## What is the purpose of the change
   
   Fix the bug about SourceCoordinator thread leaks. You can get more detailed 
background from FLINK-32411.
   
   ## Brief change log
   
   Closing the SourceCoordinatorContext inside of the SourceCoordinator#close 
even if the SourceCoordinator isn't started.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added the SourceCoordinatorTest#testClosedWithoutStart*
 - *Improved the SourceCoordinatorTest#testClosed*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  no
 - The S3 file system connector:no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint

2023-06-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32411:

Description: 
SourceCoordinator thread leaks when job recovers from checkpoint, from the 
following figure, we can see:
 * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
 * 2 SourceCoordinator thread for slow FastNumberSequenceSource 

!image-2023-06-22-11-12-35-747.png|width=889,height=225!
h1. Root cause:
 # When initialize the ExecutionJobVertex of source, 
RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
 # When job recovers from checkpoint,  
[RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
 will close the old coordinator, and create a new coordinator. 
 # The 
[SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
 just close the SourceCoordinatorContext after coordinator is started, so the 
SourceCoordinatorContext of old coordinator won't be closed.
 # The SourceCoordinatorContext create some threads in its 
[constructor|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L118],
 so it should be closed even if the SourceCoordinator isn't started.

 

The call stack about creating SourceCoordinator:
{code:java}
// Create the first SourceCoordinator
"jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142)
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
      - locked <0x1f02> (a 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
      at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
      at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210)
      at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
      at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
      at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoo

[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint

2023-06-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32411:

Description: 
SourceCoordinator thread leaks when job recovers from checkpoint, from the 
following figure, we can see:
 * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
 * 2 SourceCoordinator thread for slow FastNumberSequenceSource 

!image-2023-06-22-11-12-35-747.png|width=889,height=225!
h1. Root cause:
 # When initialize the ExecutionJobVertex of source, 
RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
 # When job recovers from checkpoint,  
[RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
 will close the old coordinator, and create a new coordinator. 
 # The 
[SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
 just close the SourceCoordinatorContext after coordinator is started, so the 
SourceCoordinatorContext of old coordinator won't be closed.
 # The SourceCoordinatorContext create some threads in its constructor, so it 
should be closed even if the SourceCoordinator isn't started.

 

The call stack about creating SourceCoordinator:
{code:java}
// Create the first SourceCoordinator
"jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142)
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
      - locked <0x1f02> (a 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
      at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
      at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210)
      at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
      at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
      at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
      at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:378)
      at org.apache.flink.runtime.jobmaster.JobMaster.(JobMast

[jira] [Updated] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint

2023-06-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32411:

Attachment: (was: image-2023-06-22-11-00-25-446.png)

> SourceCoordinator thread leaks when job recovers from checkpoint
> 
>
> Key: FLINK-32411
> URL: https://issues.apache.org/jira/browse/FLINK-32411
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: image-2023-06-22-11-12-35-747.png
>
>
> SourceCoordinator thread leaks when job recovers from checkpoint, from the 
> following figure, we can see:
>  * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
>  * 2 SourceCoordinator thread for slow FastNumberSequenceSource 
> !image-2023-06-22-11-12-35-747.png|width=889,height=225!
> h1. Root cause:
>  # When initialize the ExecutionJobVertex of source, 
> RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
> link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
>  # When job recovers from checkpoint,  
> [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
>  will close the old coordinator, and create a new coordinator. 
>  # The 
> [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
>  just close the SourceCoordinatorContext after coordinator is started, so the 
> SourceCoordinatorContext of old coordinator won't be closed.
>  # The SourceCoordinatorContext create some threads in its constructor, so it 
> should be closed even if the SourceCoordinator isn't started.
>  
> The call stack about creating SourceCoordinator:
> {code:java}
> // Create the first SourceCoordinator
> "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
>       - locked <0x1f02> (a 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
>       at 
> 

[jira] [Created] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint

2023-06-21 Thread Rui Fan (Jira)
Rui Fan created FLINK-32411:
---

 Summary: SourceCoordinator thread leaks when job recovers from 
checkpoint
 Key: FLINK-32411
 URL: https://issues.apache.org/jira/browse/FLINK-32411
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.17.1, 1.16.2
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-06-22-11-00-25-446.png, 
image-2023-06-22-11-12-35-747.png

SourceCoordinator thread leaks when job recovers from checkpoint, from the 
following figure, we can see:
 * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
 * 2 SourceCoordinator thread for slow FastNumberSequenceSource 

!image-2023-06-22-11-12-35-747.png|width=889,height=225!
h1. Root cause:
 # When initialize the ExecutionJobVertex of source, 
RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
 # When job recovers from checkpoint,  
[RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
 will close the old coordinator, and create a new coordinator. 
 # The 
[SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
 just close the SourceCoordinatorContext after coordinator is started, so the 
SourceCoordinatorContext of old coordinator won't be closed.
 # The SourceCoordinatorContext create some threads in its constructor, so it 
should be closed even if the SourceCoordinator isn't started.

 

The call stack about create SourceCoordinator:
{code:java}
// Create the first SourceCoordinator
"jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142)
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
      - locked <0x1f02> (a 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
      at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
      at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210)
      at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
      at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(De

[GitHub] [flink] afedulov commented on pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-06-21 Thread via GitHub


afedulov commented on PR #21028:
URL: https://github.com/apache/flink/pull/21028#issuecomment-1601564665

   @zentol One concerns regarding the idea to migrate the existing methods in 
`StreamExecutionEnvironment` to `DataGeneratorSource` is that adding the 
`flink-streaming-java` dependency on `flink-connector-datagen` introduces a 
somewhat unexpected cycle between `flink-architecture-tests-test` and 
`flink-clients`. How can we address this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31958) Table to DataStream allow partial fields

2023-06-21 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735812#comment-17735812
 ] 

padavan commented on FLINK-31958:
-

plz vote to this issue if you often work with models and hit this problem (y)

> Table to DataStream allow partial fields
> 
>
> Key: FLINK-31958
> URL: https://issues.apache.org/jira/browse/FLINK-31958
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Table SQL / API
>Reporter: padavan
>Priority: Major
>
> Hello i have a Model with many many fields, example:
> {code:java}
> public class UserModel { public int userId; public int count; public int zip; 
> public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime 
> wEnd; }{code}
> I work with Table API, select fields and convert Table to DataStream by 
> Model. But problem what *i should select all fields if I don't even need it* 
> or i will get exception
> {quote}Column types of query result and sink for do not match. Cause: 
> Different number of columns.
> {quote}
> And I just have to substitute fake data for the plugs...
>  
> I want simple use with only fields wich i have selected like:
> {code:java}
> .select($("userId"), $("count").sum().as("count"));
> DataStream dataStream = te.toDataStream(win, 
> UserModel.class);{code}
>  
> *Excepted:* 
> Remove rule valdiation "Different number of columns.". If a column is not 
> selected it is initialized by default(T) / Null



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22844: [WIP][FLINK-31784][runtime] Adds multi-component support to DefaultLeaderElectionService

2023-06-21 Thread via GitHub


flinkbot commented on PR #22844:
URL: https://github.com/apache/flink/pull/22844#issuecomment-1601072224

   
   ## CI report:
   
   * 506a625457f00f4c99d8ccc6a8bef17aeb4a5749 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30141) MinioTestContainerTest failed due to IllegalStateException in container startup

2023-06-21 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735782#comment-17735782
 ] 

Ryan Skraba commented on FLINK-30141:
-

I took a look at this and the related issue FLINK-26402 -- it looks like the 
*503 Service Unavailable* statuses are not rare: they occur about 1 in a couple 
hundred API calls to Minio on container startup.  On the other hand, the retry 
mechanism built into Amazon API clients _usually_ try again correctly until 
they succeed.  Sometimes, the Minio container doesn't move to the correct state 
to service API calls quickly enough, the default retry strategy fails 
eventually and we see the error here.

I can reproduce this pretty reliably by running a unit test somewhere between 
1K-10K times.  At first I assumed it occurred when the system was loaded while 
running the test, but that doesn't appear to be the case.

Attempting to start up the container more than once might be the right thing to 
do here.  If the call to Minio fails while creating the default bucket, the 
container should be discarded and tried again.  This should have no overhead on 
the daily CI runs.

> MinioTestContainerTest failed due to IllegalStateException in container 
> startup
> ---
>
> Key: FLINK-30141
> URL: https://issues.apache.org/jira/browse/FLINK-30141
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43182&view=logs&j=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc&t=3a8f44aa-4415-5b14-37d5-5fecc568b139&l=15531]
>  failed due to an {{IllegalStateException}} during container startup:
> {code:java}
> Nov 15 02:34:04 [ERROR] 
> org.apache.flink.fs.s3.common.MinioTestContainerTest.testBucketCreation  Time 
> elapsed: 120.874 s  <<< ERROR!
> Nov 15 02:34:04 org.testcontainers.containers.ContainerLaunchException: 
> Container startup failed
> Nov 15 02:34:04   at 
> org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:345)
> Nov 15 02:34:04   at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:326)
> Nov 15 02:34:04   at 
> org.apache.flink.core.testutils.TestContainerExtension.instantiateTestContainer(TestContainerExtension.java:59)
> Nov 15 02:34:04   at 
> org.apache.flink.core.testutils.TestContainerExtension.before(TestContainerExtension.java:70)
> Nov 15 02:34:04   at 
> org.apache.flink.core.testutils.EachCallbackWrapper.beforeEach(EachCallbackWrapper.java:45)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$2(TestMethodTestDescriptor.java:166)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:202)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:202)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:165)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:132)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.N

[jira] [Updated] (FLINK-31784) Add multiple-component support to DefaultLeaderElectionService

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31784:
---
Labels: pull-request-available  (was: )

> Add multiple-component support to DefaultLeaderElectionService
> --
>
> Key: FLINK-31784
> URL: https://issues.apache.org/jira/browse/FLINK-31784
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp opened a new pull request, #22844: [WIP][FLINK-31784][runtime] Adds multi-component support to DefaultLeaderElectionService

2023-06-21 Thread via GitHub


XComp opened a new pull request, #22844:
URL: https://github.com/apache/flink/pull/22844

   * https://github.com/apache/flink/pull/21742
   * https://github.com/apache/flink/pull/22379
   * https://github.com/apache/flink/pull/22422
   * https://github.com/apache/flink/pull/22380
   * https://github.com/apache/flink/pull/22623
   * https://github.com/apache/flink/pull/22384
   * https://github.com/apache/flink/pull/22390
   * https://github.com/apache/flink/pull/22404
   * https://github.com/apache/flink/pull/22601
   * https://github.com/apache/flink/pull/22642
   * https://github.com/apache/flink/pull/22640
   * https://github.com/apache/flink/pull/22656
   * https://github.com/apache/flink/pull/22828
   * https://github.com/apache/flink/pull/22830
   * https://github.com/apache/flink/pull/22829
   * https://github.com/apache/flink/pull/22661
   * === THIS PR === FLINK-31784
   
   ## What is the purpose of the change
   
   Replacing the `leaderContender` and `contenderID` fields in 
`DefaultLeaderElectionService` with a `leaderContenderRegistry` that allows 
adding multiple contenders.
   
   ## Brief change log
   
   * Removes `DefaultLeaderElectionService` fields `leaderContender` and 
`contenderID`
   * Adds `DefaultLeaderElectionService#leaderContenderRegistry`
   * Makes `MultpleComponentLeaderElectionDriver.Listener` interface methods 
first-class citizens and let the `LeaderElectionEventHandler` interface call 
those
   * Introduces `contenderID` in `EmbeddedLeaderElection` to allow logging
   * Removes `LeaderContender#getDescription()`
   
   ## Verifying this change
   
   * Added multi-component support test scenarios to 
`DefaultLeaderElectionServiceTest`
   * Copied relevant tests over from 
`DefaultMultipleComponentLeaderElectionServiceTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22843: [FLINK-30141][s3][filesystem] Increase retries on MinIO container startup

2023-06-21 Thread via GitHub


flinkbot commented on PR #22843:
URL: https://github.com/apache/flink/pull/22843#issuecomment-1601050461

   
   ## CI report:
   
   * ab40ed309eb39c427278658d47dbc2d6dc3139b4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32261) Add MAP_UNION support in SQL & Table API

2023-06-21 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated FLINK-32261:

Description: 
Description:

This is an implementation of MAP_UNION 

Returns a map created by merging two maps, 'map1' and 'map2'. This two maps 
should have same data structure. If there are overlapping keys, the value from 
'map2' will overwrite the value from 'map1'. If any of maps are null, return 
null.

Syntax:
{code:java}
MAP_UNION(map1, map2){code}
Arguments:
 * map1:The first map to be merged.
 * map2:The second map to be merged.

Returns: A new map that contains the combined key-value pairs from {{map1}} and 
map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will 
overwrite the value from {{{}map1{}}}.

Examples:

 Merging maps with unique keys:

 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] 
map_union[map1, map2]  
Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
 Merging maps with overlapping keys:

 
 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] 
map_union[map1, map2] 
Output: ['a': 1, 'b': 3, 'c': 4]{code}
See also:

prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]

 

 

  was:
Description:

This is an implementation of MAP_UNION 

Returns a map created by merging two maps, 'map1' and 'map2'. This two maps 
should have same data structure. If there are overlapping keys, the value from 
'map2' will overwrite the value from 'map1'. If any of maps are null, return 
null.

Syntax:

`MAP_UNION(map1, map2)`

Arguments:
 * map1:The first map to be merged.
 * map2:The second map to be merged.

Returns: A new map that contains the combined key-value pairs from {{map1}} and 
map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will 
overwrite the value from {{{}map1{}}}.

Examples:

 Merging maps with unique keys:

 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] 
map_union[map1, map2]  
Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
 Merging maps with overlapping keys:

 
 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] 
map_union[map1, map2] 
Output: ['a': 1, 'b': 3, 'c': 4]{code}
See also:

prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]

 

 


> Add MAP_UNION support in SQL & Table API
> 
>
> Key: FLINK-32261
> URL: https://issues.apache.org/jira/browse/FLINK-32261
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Description:
> This is an implementation of MAP_UNION 
> Returns a map created by merging two maps, 'map1' and 'map2'. This two maps 
> should have same data structure. If there are overlapping keys, the value 
> from 'map2' will overwrite the value from 'map1'. If any of maps are null, 
> return null.
> Syntax:
> {code:java}
> MAP_UNION(map1, map2){code}
> Arguments:
>  * map1:The first map to be merged.
>  * map2:The second map to be merged.
> Returns: A new map that contains the combined key-value pairs from {{map1}} 
> and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} 
> will overwrite the value from {{{}map1{}}}.
> Examples:
>  Merging maps with unique keys:
>  
> {code:java}
> map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] 
> map_union[map1, map2]  
> Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
>  Merging maps with overlapping keys:
>  
>  
> {code:java}
> map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] 
> map_union[map1, map2] 
> Output: ['a': 1, 'b': 3, 'c': 4]{code}
> See also:
> prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32261) Add MAP_UNION support in SQL & Table API

2023-06-21 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated FLINK-32261:

Description: 
Description:

This is an implementation of MAP_UNION 

Returns a map created by merging two maps, 'map1' and 'map2'. This two maps 
should have same data structure. If there are overlapping keys, the value from 
'map2' will overwrite the value from 'map1'. If any of maps are null, return 
null.

Syntax:

`MAP_UNION(map1, map2)`

Arguments:
 * map1:The first map to be merged.
 * map2:The second map to be merged.

Returns: A new map that contains the combined key-value pairs from {{map1}} and 
map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will 
overwrite the value from {{{}map1{}}}.

Examples:

 Merging maps with unique keys:

 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] 
map_union[map1, map2]  
Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
 Merging maps with overlapping keys:

 
 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] 
map_union[map1, map2] 
Output: ['a': 1, 'b': 3, 'c': 4]{code}
See also:

prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]

 

 

  was:
Description:

The current implementation of the {{map_union}} method in the Flink library 
does not provide a way to combine two dictionaries into a single dictionary. 
This enhancement aims to add this functionality, allowing users to merge 
dictionaries efficiently.

Syntax:

 
{code:java}
map_union[map1: map, map2: map] -> map{code}
Arguments:
 * map1:The first map to be merged.
 * map2:The second map to be merged.

Returns: A new map that contains the combined key-value pairs from {{map1}} and 
map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} will 
overwrite the value from {{{}map1{}}}.

Examples:

 Merging maps with unique keys:

 
{code:java}
map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] result = map_union(map1, map2)  
Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
 Merging maps with overlapping keys:

 
 
{code:java}
map1 = ['a': 1, 'b': 2] dict2 = ['b': 3, 'c': 4] result = map_union[dict1, 
dict2] Output: ['a': 1, 'b': 3, 'c': 4]{code}
See also:

prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]

 

 


> Add MAP_UNION support in SQL & Table API
> 
>
> Key: FLINK-32261
> URL: https://issues.apache.org/jira/browse/FLINK-32261
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Description:
> This is an implementation of MAP_UNION 
> Returns a map created by merging two maps, 'map1' and 'map2'. This two maps 
> should have same data structure. If there are overlapping keys, the value 
> from 'map2' will overwrite the value from 'map1'. If any of maps are null, 
> return null.
> Syntax:
> `MAP_UNION(map1, map2)`
> Arguments:
>  * map1:The first map to be merged.
>  * map2:The second map to be merged.
> Returns: A new map that contains the combined key-value pairs from {{map1}} 
> and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} 
> will overwrite the value from {{{}map1{}}}.
> Examples:
>  Merging maps with unique keys:
>  
> {code:java}
> map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] 
> map_union[map1, map2]  
> Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
>  Merging maps with overlapping keys:
>  
>  
> {code:java}
> map1 = ['a': 1, 'b': 2] map2 = ['b': 3, 'c': 4] 
> map_union[map1, map2] 
> Output: ['a': 1, 'b': 3, 'c': 4]{code}
> See also:
> prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30141) MinioTestContainerTest failed due to IllegalStateException in container startup

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30141:
---
Labels: pull-request-available test-stability  (was: test-stability)

> MinioTestContainerTest failed due to IllegalStateException in container 
> startup
> ---
>
> Key: FLINK-30141
> URL: https://issues.apache.org/jira/browse/FLINK-30141
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43182&view=logs&j=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc&t=3a8f44aa-4415-5b14-37d5-5fecc568b139&l=15531]
>  failed due to an {{IllegalStateException}} during container startup:
> {code:java}
> Nov 15 02:34:04 [ERROR] 
> org.apache.flink.fs.s3.common.MinioTestContainerTest.testBucketCreation  Time 
> elapsed: 120.874 s  <<< ERROR!
> Nov 15 02:34:04 org.testcontainers.containers.ContainerLaunchException: 
> Container startup failed
> Nov 15 02:34:04   at 
> org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:345)
> Nov 15 02:34:04   at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:326)
> Nov 15 02:34:04   at 
> org.apache.flink.core.testutils.TestContainerExtension.instantiateTestContainer(TestContainerExtension.java:59)
> Nov 15 02:34:04   at 
> org.apache.flink.core.testutils.TestContainerExtension.before(TestContainerExtension.java:70)
> Nov 15 02:34:04   at 
> org.apache.flink.core.testutils.EachCallbackWrapper.beforeEach(EachCallbackWrapper.java:45)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$2(TestMethodTestDescriptor.java:166)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:202)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:202)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:165)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:132)
> Nov 15 02:34:04   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.executeNonConcurrentTasks(ForkJoinPoolHierarchicalTestExecutorService.java:155)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:135)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> Nov 15 02:34:04   at 
> org.junit.platform.engine.support.

[GitHub] [flink] RyanSkraba opened a new pull request, #22843: [FLINK-30141][s3][filesystem] Increase retries on MinIO container startup

2023-06-21 Thread via GitHub


RyanSkraba opened a new pull request, #22843:
URL: https://github.com/apache/flink/pull/22843

   
   
   ## What is the purpose of the change
   
   We observe flaky tests using MinIO about once per month, and it appears to 
be due to a container responding to API requests with enough **503 Service 
Unavailable** statuses that the default retry mechanism on the `AmazonS3` 
client fails.
   
   ## Brief change log
   
   Increase the number of TestContainer startup attempts from 1 to 3.
   
   I chose to do this at the TestContainer level instead of making the S3 
client retry more often, since it is a (1) a sufficiently rare event and (2) 
specifically related to container start-up.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
MinioTestContainerTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  **no**
 - The serializers:  **no**
 - The runtime per-record code paths (performance sensitive):  **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  **no**
 - The S3 file system connector:  **yes** (indirectly)
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  **no**
 - If yes, how is the feature documented?  **no**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-21 Thread via GitHub


flinkbot commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1601024848

   
   ## CI report:
   
   * 4f26f9375e99db7765ce6a4bf6d161bc8ef4899e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32261) Add MAP_UNION support in SQL & Table API

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32261:
---
Labels: pull-request-available  (was: )

> Add MAP_UNION support in SQL & Table API
> 
>
> Key: FLINK-32261
> URL: https://issues.apache.org/jira/browse/FLINK-32261
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Description:
> The current implementation of the {{map_union}} method in the Flink library 
> does not provide a way to combine two dictionaries into a single dictionary. 
> This enhancement aims to add this functionality, allowing users to merge 
> dictionaries efficiently.
> Syntax:
>  
> {code:java}
> map_union[map1: map, map2: map] -> map{code}
> Arguments:
>  * map1:The first map to be merged.
>  * map2:The second map to be merged.
> Returns: A new map that contains the combined key-value pairs from {{map1}} 
> and map{{{}2{}}}. If there are any overlapping keys, the value from {{map2}} 
> will overwrite the value from {{{}map1{}}}.
> Examples:
>  Merging maps with unique keys:
>  
> {code:java}
> map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] result = map_union(map1, 
> map2)  Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]{code}
>  Merging maps with overlapping keys:
>  
>  
> {code:java}
> map1 = ['a': 1, 'b': 2] dict2 = ['b': 3, 'c': 4] result = map_union[dict1, 
> dict2] Output: ['a': 1, 'b': 3, 'c': 4]{code}
> See also:
> prestodb: [https://prestodb.io/docs/current/functions/aggregate.html]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 opened a new pull request, #22842: [FLINK-32261]-table-Add-MAP_UNION-function

2023-06-21 Thread via GitHub


hanyuzheng7 opened a new pull request, #22842:
URL: https://github.com/apache/flink/pull/22842

   Description:
   
   The current implementation of the map_union method in the Flink library does 
not provide a way to combine two dictionaries into a single dictionary. This 
enhancement aims to add this functionality, allowing users to merge 
dictionaries efficiently.
   
   Syntax:
   
   map_union[map1: map, map2: map] -> map
   Arguments:
   
   map1:The first map to be merged.
   map2:The second map to be merged.
   Returns: A new map that contains the combined key-value pairs from map1 and 
map2. If there are any overlapping keys, the value from map2 will overwrite the 
value from map1.
   
   Examples:
   
Merging maps with unique keys:
   
   map1 = ['a': 1, 'b': 2] map2 = ['c': 3, 'd': 4] result = map_union(map1, 
map2)  Output: ['a': 1, 'b': 2, 'c': 3, 'd': 4]
Merging maps with overlapping keys:
   
   map1 = ['a': 1, 'b': 2] dict2 = ['b': 3, 'c': 4] result = map_union[dict1, 
dict2] Output: ['a': 1, 'b': 3, 'c': 4] See also:
   
   prestodb: https://prestodb.io/docs/current/functions/aggregate.html
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-32408.
--
Resolution: Fixed

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22841: [FLINK-32410] Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread via GitHub


flinkbot commented on PR #22841:
URL: https://github.com/apache/flink/pull/22841#issuecomment-1600948809

   
   ## CI report:
   
   * d8f3e1e99759cb5b91f7d560f5fbda643e4255a2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22806: [FLINK-32362] [SourceAlignment] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-06-21 Thread via GitHub


1996fanrui commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1236917373


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -195,9 +195,19 @@ void announceCombinedWatermark() {
 "Distributing maxAllowedWatermark={} to subTaskIds={}",
 maxAllowedWatermark,
 subTaskIds);
-for (Integer subtaskId : subTaskIds) {
-context.sendEventToSourceOperator(
-subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+// Because of Java-ThreadPoolExecutor will not schedule the period task
+// if it throws an exception, so we should handle the potential 
exception like
+// "subtask xx is not ready yet to receive events" to increase 
robustness.
+try {
+for (Integer subtaskId : subTaskIds) {
+context.sendEventToSourceOperator(
+subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+}

Review Comment:
   Hi @LoveHeat , thanks for your feedback.
   
   As I said at FLINK-32362 before, I'm not sure what should we do when some 
subtasks are not ready.
   
   - Option1: Send event to all ready subtasks, and just ignore unready 
subtasks.
   - Option2: Don't send any event before all subtasks are ready.
   
   If we expect option1, we should ensure the event is sent to all ready 
subtasks.
   
   > In my opinion, if one task is during failing, other tasks maybe also 
during failing with high probability
   
   When `jobmanager.execution.failover-strategy` is region, and all subtasks 
don't have shuffle link. If subtask0  cannot start or fails, other subtasks 
should work well, right?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-06-21 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1237090987


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml:
##
@@ -107,14 +107,14 @@ 
Calc(select=[*org.apache.flink.table.planner.expressions.utils.Func24$$2da7dcb3c
   
 
   

[jira] [Updated] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32410:
---
Labels: pull-request-available  (was: )

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] StefanRRichter opened a new pull request, #22841: [FLINK-32410] Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread via GitHub


StefanRRichter opened a new pull request, #22841:
URL: https://github.com/apache/flink/pull/22841

   
   
   ## What is the purpose of the change
   
   The JDK API to create hash-based collections for a certain capacity is 
arguably misleading because it doesn't size the collections to "hold a specific 
number of items" like you'd expect it would. Instead it sizes it to hold 
"load-factor%" of the specified number.
   
   For the common pattern to allocate a hash-based collection with the size of 
expected elements to avoid rehashes, this means that a rehash is essentially 
guaranteed.
   
   This PR replaces constructor calls for allocations for expected size with 
helper methods (similar to Guava's `Maps.newHashMapWithExpectedSize(int)`) .
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32409) Moves componentId/contenderID handling from DefaultMultipleComponentLeaderElectionService into DefaultLeaderElectionService

2023-06-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-32409:
--
Summary: Moves componentId/contenderID handling from 
DefaultMultipleComponentLeaderElectionService into DefaultLeaderElectionService 
 (was: Remove MultipleComponentLeaderElectionDriverAdapter)

> Moves componentId/contenderID handling from 
> DefaultMultipleComponentLeaderElectionService into 
> DefaultLeaderElectionService
> ---
>
> Key: FLINK-32409
> URL: https://issues.apache.org/jira/browse/FLINK-32409
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32410:
--

 Summary: Allocate hash-based collections with sufficient capacity 
for expected size
 Key: FLINK-32410
 URL: https://issues.apache.org/jira/browse/FLINK-32410
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


The JDK API to create hash-based collections for a certain capacity is arguable 
misleading because it doesn't size the collections to "hold a specific number 
of items" like you'd expect it would. Instead it sizes it to hold load-factor% 
of the specified number.

For the common pattern to allocate a hash-based collection with the size of 
expected elements to avoid rehashes, this means that a rehash is essentially 
guaranteed.

We should introduce helper methods (similar to Guava's 
`Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] afedulov closed pull request #17314: [WIP][FLINK-22790] HybridSource E2E tests

2023-06-21 Thread via GitHub


afedulov closed pull request #17314: [WIP][FLINK-22790] HybridSource E2E tests 
URL: https://github.com/apache/flink/pull/17314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov closed pull request #17215: [FLINK-22790] HybridSource E2E tests

2023-06-21 Thread via GitHub


afedulov closed pull request #17215: [FLINK-22790] HybridSource E2E tests
URL: https://github.com/apache/flink/pull/17215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a diff in pull request #22827: [FLINK-20887][table-planner] Disable project merge during sql2rel phase by default to avoid incorrectly project merge

2023-06-21 Thread via GitHub


lincoln-lil commented on code in PR #22827:
URL: https://github.com/apache/flink/pull/22827#discussion_r1236728489


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java:
##
@@ -352,8 +358,25 @@ public void testProjectionIncludingOnlyMetadata() {
 .containsExactly("metadata");
 }
 
+private void replaceProgramWithProjectMergeRule() {
+FlinkChainedProgram programs = new 
FlinkChainedProgram();
+programs.addLast(
+"rules",
+
FlinkHepRuleSetProgramBuilder.newBuilder()
+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+.add(
+RuleSets.ofList(
+CoreRules.PROJECT_MERGE,
+
PushProjectIntoTableSourceScanRule.INSTANCE))
+.build());
+util().replaceBatchProgram(programs);
+}

Review Comment:
   the ast changes after we disable project merge during sql2rel phase by 
default in the 1st commit, and this rule test will fail, so it should stay in 
1st commit



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCalcMergeRule.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Extends calcite's ProjectCalcMergeRule, modification: does not merge the 
filter references field
+ * which generated by non-deterministic function.
+ */
+public class FlinkProjectCalcMergeRule extends ProjectCalcMergeRule {
+
+public static final RelOptRule INSTANCE = new 
FlinkProjectCalcMergeRule(Config.DEFAULT);
+
+protected FlinkProjectCalcMergeRule(Config config) {
+super(config);
+}
+
+@Override
+public void onMatch(RelOptRuleCall call) {
+LogicalProject project = call.rel(0);
+LogicalCalc calc = call.rel(1);
+
+List expandProjects =
+calc.getProgram().getProjectList().stream()
+.map(p -> calc.getProgram().expandLocalRef(p))
+.collect(Collectors.toList());
+InputRefVisitor inputRefVisitor = new InputRefVisitor();
+project.getProjects().forEach(p -> p.accept(inputRefVisitor));
+boolean existNonDeterministicRef =
+Arrays.stream(inputRefVisitor.getFields())
+.anyMatch(i -> 
!RexUtil.isDeterministic(expandProjects.get(i)));
+
+if (!existNonDeterministicRef) {
+super.onMatch(call);
+}
+}
+}

Review Comment:
   make sense, I'll update it.



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##
@@ -650,6 +654,75 @@ object FlinkRexUtil {
   rexBuilder,
   converter);
   }
+
+  /**
+   * Return two neighbouring [[Project]] can merge into one [[Project]] or 
not. If the two
+   * [[Project]] can merge into one, each non-deterministic [[RexNode]] of 
bottom [[Project]] should
+   * appear at most once in the project list of top [[Project]].
+   */
+  def isMergeable(topProject: Project, bottomProject: Project): Boolean = {
+val topInputRefCounter: Array[Int] =
+  Array.fill(topProject.getInput.getRowType.getFieldCount)(0)
+
+mergeable(topInputRefCounter, topProject.getProjects, 
bottomProject.getProjects)
+  }
+
+  /**
+   * An InputRefCounter that count every inputRef's referenc

[jira] [Created] (FLINK-32409) Remove MultipleComponentLeaderElectionDriverAdapter

2023-06-21 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32409:
-

 Summary: Remove MultipleComponentLeaderElectionDriverAdapter
 Key: FLINK-32409
 URL: https://issues.apache.org/jira/browse/FLINK-32409
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32409) Remove MultipleComponentLeaderElectionDriverAdapter

2023-06-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-32409:
-

Assignee: Matthias Pohl

> Remove MultipleComponentLeaderElectionDriverAdapter
> ---
>
> Key: FLINK-32409
> URL: https://issues.apache.org/jira/browse/FLINK-32409
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735722#comment-17735722
 ] 

Gyula Fora commented on FLINK-32408:


If you are using Operator 1.5.0 with Flink 1.17.1 you need to use the old 
config key. But it will work

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735717#comment-17735717
 ] 

dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:22 PM:
--

I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version, since they had flink version configured to 1.16.1


was (Author: JIRAUSER300481):
I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version, since they had flink version to 1.16.1

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735717#comment-17735717
 ] 

dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:21 PM:
--

I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version, since they had flink version to 1.16.1


was (Author: JIRAUSER300481):
I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version.

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735717#comment-17735717
 ] 

dongwoo.kim commented on FLINK-32408:
-

I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version.

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32408:

Description: 
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work but it didn't, only *high-availability* works

*ref*
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 

  was:
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work but it didn't{*}.{*}

*ref*
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 


> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on pull request #22661: [WIP][FLINK-31783][runtime] Migrates DefaultLeaderElectionService from LeaderElectionDriver to the MultipleComponentLeaderElectionDriver interf

2023-06-21 Thread via GitHub


XComp commented on PR #22661:
URL: https://github.com/apache/flink/pull/22661#issuecomment-1600813763


https://github.com/apache/flink/pull/22661/commits/4338f36dee313ca5d5901a753961323e9f299372
 is the "initial" commit that refactors the interfaces. The outcome were 
failing test because I didn't do a good job fixing the tests in the first run. 
I fixed the individual tests in the subsequent commits. I suggest going through 
the PR one commit at a time. That's especially useful for the commits where I 
went through the test methods to make sure that the function as expected, still.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32408:

Description: 
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work but it didn't{*}.{*}

*ref*
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 

  was:
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work ** but it didn't{*}.{*}


ref: 
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 


> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't{*}.{*}
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-32408:
---

 Summary: JobManager HA configuration update needed in Flink k8s 
Operator 
 Key: FLINK-32408
 URL: https://issues.apache.org/jira/browse/FLINK-32408
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: dongwoo.kim
 Fix For: kubernetes-operator-1.6.0


In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work ** but it didn't{*}.{*}


ref: 
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22840: [WIP][FLINK-32376] Extend Sink#InitContext

2023-06-21 Thread via GitHub


flinkbot commented on PR #22840:
URL: https://github.com/apache/flink/pull/22840#issuecomment-1600790449

   
   ## CI report:
   
   * 7495b60bb3d5c27b68bc9b79d733ca1c9b9758b7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32376) [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32376:
---
Labels: pull-request-available  (was: )

> [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and 
> JobID
> --
>
> Key: FLINK-32376
> URL: https://issues.apache.org/jira/browse/FLINK-32376
> Project: Flink
>  Issue Type: Improvement
>Reporter: João Boto
>Priority: Major
>  Labels: pull-request-available
>
> Implementation of FLIP-287 (linked)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] eskabetxe opened a new pull request, #22840: [WIP][FLINK-32376] Extend Sink#InitContext

2023-06-21 Thread via GitHub


eskabetxe opened a new pull request, #22840:
URL: https://github.com/apache/flink/pull/22840

   ## What is the purpose of the change
   
   Address the [FLIP-287 Extend Sink#InitContext to expose TypeSerializer, 
ObjectReuse and 
JobID](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853)
   
   ## Brief change log
   
   Add three new methods to Sink#InitContext
 - isObjectReuseEnabled
 - createInputSerializer
 - getJobId
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no )
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( yes )
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no )
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( yes )
 - If yes, how is the feature documented? ( not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22806: [FLINK-32362] [SourceAlignment] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-06-21 Thread via GitHub


1996fanrui commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1236917373


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -195,9 +195,19 @@ void announceCombinedWatermark() {
 "Distributing maxAllowedWatermark={} to subTaskIds={}",
 maxAllowedWatermark,
 subTaskIds);
-for (Integer subtaskId : subTaskIds) {
-context.sendEventToSourceOperator(
-subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+// Because of Java-ThreadPoolExecutor will not schedule the period task
+// if it throws an exception, so we should handle the potential 
exception like
+// "subtask xx is not ready yet to receive events" to increase 
robustness.
+try {
+for (Integer subtaskId : subTaskIds) {
+context.sendEventToSourceOperator(
+subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+}

Review Comment:
   Hi @LoveHeat , thanks for your feedback.
   
   As I said at FLINK-32362 before, I'm not sure what should we do when some 
subtasks are not ready.
   
   - Option1: Send event to all ready subtasks, and just ignore unready 
subtasks.
   - Option2: Don't send any event before all subtasks are ready.
   
   If we expect option1, we should ensure the event is sent to all ready 
subtasks.
   
   > In my opinion, if one task is during failing, other tasks maybe also 
during failing with high probability
   
   When `jobmanager.execution.failover-strategy` is region, and all subtasks 
don't have shuffle link. If subtask0  cannot start or fails, other subtasks 
should work well, right?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-21 Thread via GitHub


flinkbot commented on PR #22839:
URL: https://github.com/apache/flink/pull/22839#issuecomment-1600710495

   
   ## CI report:
   
   * 9604ce4793f3f9dcbf6d65fbbb9cf64bd690e6ce UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32349:
---
Labels: pull-request-available  (was: )

> Support atomic for CREATE TABLE AS SELECT(CTAS) statement
> -
>
> Key: FLINK-32349
> URL: https://issues.apache.org/jira/browse/FLINK-32349
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> For detailed information, see FLIP-305
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Tartarus0zm opened a new pull request, #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-21 Thread via GitHub


Tartarus0zm opened a new pull request, #22839:
URL: https://github.com/apache/flink/pull/22839

   ## What is the purpose of the change
   
   Support atomic for CREATE TABLE AS SELECT(CTAS) statement
   
   
   ## Brief change log
   
   Introduce StagedTable interface that support atomic operations.
   
   ## Verifying this change
   
   StagedTableITCase#testStagedTableWithAtomicCtas
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

2023-06-21 Thread via GitHub


luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1236760683


##
docs/content/docs/connectors/table/filesystem.md:
##
@@ -50,6 +50,10 @@ CREATE TABLE MyUserTable (
 -- section for more details
   'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
 -- column value is null/empty string
+  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
files under the directory

Review Comment:
   ```suggestion
 'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
files to read under the directory
   ```



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * Please note that file blocks are only exposed by some file systems, such 
as HDFS. File systems
+ * that do not expose block information will not create multiple file splits 
per file, but keep the
+ * files as one source split.
+ *
+ * Files with suffixes corresponding to known compression formats (for 
example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of 
known formats and
+ * suffixes.
+ *
+ * Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator 
will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+@Internal
+public class BlockSplittingRecursiveAllDirEnumerator extends 
BlockSplittingRecursiveEnumerator {
+
+/** The filter used to skip hidden directories. */
+private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+/**
+ * Creates a new enumerator that enumerates all files whose file path 
matches the regex except
+ * hidden files. Hidden files are considered files where the filename 
starts with '.' or with
+ * '_'.
+ *
+ * The enumerator does not split files that have a suffix corresponding 
to a known
+ * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...). 
See {@link
+ * StandardDeCompressors} for details.
+ */
+public BlockSplittingRecursiveAllDirEnumerator(String pathPattern) {
+this(
+new RegexFileFilter(pathPattern),
+StandardDeCompressors.getCommonSuffixes().toArray(new 
String[0]));
+}
+
+/**
+ * Creates a new enumerator that uses the given predicate as a filter for 
file paths, and avoids
+ * splitting files with the given extension (typically to avoid splitting 
compressed files).
+ */
+public BlockSplittingRecursiveAllDirEnumerator(
+final Predicate fileFilter, final String[] 
nonSplittableFileSuffixes) {
+super(fileFilter, nonSplittableFileSuffixes);
+}
+
+@Override
+protected void addSplitsForPath(
+FileStatus fileStatus, FileSystem fs, ArrayList 
target)
+throws IOException {
+if (fileStatus.isDir()) {
+if (!hiddenDirFilter.test(fileStatus.getPath())) {
+return;
+}
+final FileStatus[] containedFiles = 
fs.listStatus(fileStatus.getPath());
+for (FileStatus containedStatus : containedFiles) {
+// If this dir matches the regex, add all files under it as a 
split (not include the

Review Comment:
   plase remove 

[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-21 Thread via GitHub


gaborgsomogyi commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1236809906


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/DelegationTokenIdentifier.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.io.Text;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/** Delegation token identifier for HiveServer2. */
+@Internal
+public class DelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {

Review Comment:
   All classes having `HiveServer2` prefix except this. Why?



##
flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java:
##
@@ -38,6 +47,7 @@
  * implementing a reusable test utility around it, consequently had to resort 
to relying on mockito.
  */
 class HadoopUserUtilsITCase {

Review Comment:
   `HadoopUserUtils` != `HadoopUtils`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on pull request #21774: [FLINK-28227][connectors] Migrate o.a.f.streaming.examples to the new Source API

2023-06-21 Thread via GitHub


afedulov commented on PR #21774:
URL: https://github.com/apache/flink/pull/21774#issuecomment-1600653013

   @reswqa I just pushed the latest changes, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32375) Flink AWS Source AssumeRole in VPC

2023-06-21 Thread Tomas Witzany (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735668#comment-17735668
 ] 

Tomas Witzany commented on FLINK-32375:
---

Thanks for the reply, ill look into backporting the fix.

> Flink AWS Source AssumeRole in VPC
> --
>
> Key: FLINK-32375
> URL: https://issues.apache.org/jira/browse/FLINK-32375
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Affects Versions: 1.15.4
> Environment: Flink 1.15.4
> running on Amazon KDA (managed flink)
> runtime is running inside a VPC
> input stream cross-account
>Reporter: Tomas Witzany
>Priority: Major
>
> Current way to configure auth against AWS supports assuming a role, but when 
> you assume a role in a VPC without a NAT gateway, the global STS endpoint is 
> not accessible. And there is no way to configure the provider to use a 
> different endpoint.
> This means that there currently is no supported way to configure AWS auth in 
> such a situation. Note that you can add an sts endpoint to a VPC, but its 
> always a regional endpoint, not the global endpoint.
> Options on how you can configure this:
>  * configuring the aws DefaultsMode, by default legacy, to in-region:
>  ** environment variables - not possible in KDA
>  ** system variables - not possible in KDA
>  ** aws config file - not possible in KDA
>  * adding endpoint configuration options to the assume role provider
> The piece of code that creates the provider and how it could be extended to 
> support endpoint configuration (just an example)
> {code:java}
> private static AwsCredentialsProvider getAssumeRoleCredentialProvider(
> final Properties configProps, final String configPrefix) {
> return StsAssumeRoleCredentialsProvider.builder()
> .refreshRequest(
> AssumeRoleRequest.builder()
> .roleArn(
> configProps.getProperty(
> 
> AWSConfigConstants.roleArn(configPrefix)))
> .roleSessionName(
> configProps.getProperty(
> 
> AWSConfigConstants.roleSessionName(configPrefix)))
> .externalId(
> configProps.getProperty(
> 
> AWSConfigConstants.externalId(configPrefix)))
> .build())
> .stsClient(
> StsClient.builder()
> .credentialsProvider(
> getCredentialsProvider(
> configProps,
> 
> AWSConfigConstants.roleCredentialsProvider(
> configPrefix)))
> .endpointOverride(new URI( // added code
> 
> configProps.getProperty(AWSConfigConstants.endpointOverride(configPrefix)) // 
> added code
> )) // added code
> .region(getRegion(configProps))
> .build())
> .build();
> } {code}
>  
> I am not entirely certain that there is no other way to configure this in my 
> situation, my current plan is to build my own version of the connectors with 
> this option supported. If a feature like this would be nice to have, I would 
> be happy to share my results in a PR afterwards.
> However,  if there is a better way to configure this, I would be happy to 
> hear about it. If you know of some trick to do this in KDA, where you have 
> limited options to configure things.
> Some other options to attack this problem:
>  * trying to set system properties on the task manager before the kinesis 
> source is initialized - this is hard as you dont have control over execution 
> order, probably doable though with some hacks
>  * ask AWS support to set a system property with flink config file options - 
> this is hard as it will involve aws support
>  * add a NAT gateway to the VPC - this will not be always an option because 
> of security reasons



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-21 Thread via GitHub


xintongsong commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1236252253


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileWriter.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link PartitionFileWriter} interface defines the write logic for 
different types of shuffle
+ * files.
+ */
+public interface PartitionFileWriter {
+
+/**
+ * Write the {@link SpilledBufferContext}s to the partition file. The 
written buffers may belong
+ * to multiple subpartitions.
+ *
+ * @return the completable future indicating whether the writing file 
process has finished. If
+ * the {@link CompletableFuture} is completed, the written process is 
completed.
+ */
+CompletableFuture write(List 
spilledBuffers);

Review Comment:
   Should explicitly mention that `spilledBuffers` should be consecutive in the 
file.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SpilledBufferContext.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The buffer context to be flushed, including the {@link Buffer}, the buffer 
index, the
+ * subpartition id, the segment id, etc.
+ */
+public class SpilledBufferContext {
+
+/** The data buffer. Note that the buffer should not be null. */
+private final Buffer buffer;
+
+/** The index of buffer. */
+private final int bufferIndex;
+
+/** The id of subpartition. */
+private final int subpartitionId;
+
+/** The id of segment. */
+private final int segmentId;

Review Comment:
   Why do we need these when we already have `SubpartitionSpilledBufferContext` 
and `SegmentSpilledBufferContext`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+

[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-06-21 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31901:
-
Fix Version/s: ml-2.4.0
   (was: ml-2.3.0)

> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> Key: FLINK-31901
> URL: https://issues.apache.org/jira/browse/FLINK-31901
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.4.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived records.
>  
> Processing cached elements is invoked via `Input#processElement` and 
> `Input#processWatermark`.  However, processing cached element may take a long 
> time since there may be many cached records, which could potentially block 
> the checkpoint barrier.
>  
> If we run the code snippet here[1], we are supposed to get logs as follows.
> {code:java}
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
> time: 1682319149462
> processed cached records, cnt: 1 at time: 1682319149569
> processed cached records, cnt: 2 at time: 1682319149614
> processed cached records, cnt: 3 at time: 1682319149655
> processed cached records, cnt: 4 at time: 1682319149702
> processed cached records, cnt: 5 at time: 1682319149746
> processed cached records, cnt: 6 at time: 1682319149781
> processed cached records, cnt: 7 at time: 1682319149891
> processed cached records, cnt: 8 at time: 1682319150011
> processed cached records, cnt: 9 at time: 1682319150116
> processed cached records, cnt: 10 at time: 1682319150199
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
> time: 1682319150378
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
> time: 1682319150606
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
> time: 1682319150704
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
> time: 1682319150785
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
> time: 1682319150859
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
> time: 1682319150935
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
> time: 1682319151007{code}
>  
> We can find that from line#2 to line#11, there is no checkpoints and the 
> barriers are blocked until all cached elements are processed, which takes 
> ~600ms and much longer than checkpoint interval (i.e., 100ms)
>  
>   [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32335) Fix the Flink ML unittest failure

2023-06-21 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-32335:
-
Fix Version/s: ml-2.4.0
   (was: ml-2.3.0)

> Fix the Flink ML unittest failure
> -
>
> Key: FLINK-32335
> URL: https://issues.apache.org/jira/browse/FLINK-32335
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
> Fix For: ml-2.4.0
>
>
> The [github 
> CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620)
>  of Flink ML failed because of the following exception.
>  
> {code:java}
> E   Caused by: java.util.ConcurrentModificationException
> 223E  at 
> java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
> 224E  at 
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
> 225E  at 
> org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464)
> 226E  at 
> org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392)
> 227E  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
> 228E  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
> 229E  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
> 230E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> 231E  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> 232E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> 233E  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> 234E  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> 235E  at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> 236E  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> 237E  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> 238E  at java.lang.Thread.run(Thread.java:750){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31948) Supports triggering CI manually

2023-06-21 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31948:
-
Fix Version/s: ml-2.4.0
   (was: ml-2.3.0)

> Supports triggering CI manually
> ---
>
> Key: FLINK-31948
> URL: https://issues.apache.org/jira/browse/FLINK-31948
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> Supports triggering CI manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #614: [FLINK-32057] Support 1.18 rescale api for applying parallelism overrides

2023-06-21 Thread via GitHub


gyfora commented on PR #614:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/614#issuecomment-1600576306

   Addressed your comments @mxm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jiaoqingbo commented on pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-21 Thread via GitHub


jiaoqingbo commented on PR #22694:
URL: https://github.com/apache/flink/pull/22694#issuecomment-1600558583

   @pvary  could you help take a look when you have time? Thanks. I added some 
UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30342) Migrate ZooKeeperLeaderElectionTest

2023-06-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-30342:
--
Parent: FLINK-26522
Issue Type: Sub-task  (was: Technical Debt)

> Migrate ZooKeeperLeaderElectionTest
> ---
>
> Key: FLINK-30342
> URL: https://issues.apache.org/jira/browse/FLINK-30342
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> ||ZooKeeperLeaderElectionTest||ZooKeeperMultipleComponentLeaderElectionDriverTest||
> |testZooKeeperLeaderElectionRetrieval|testPublishLeaderInformation|
> |testZooKeeperReelection|testLeaderElectionWithMultipleDrivers|
> |testZooKeeperReelectionWithReplacement||
> |testLeaderShouldBeCorrectedWhenOverwritten|testNonLeaderCannotPublishLeaderInformation
>  (slightly different)|
> |testExceptionForwarding||
> |testEphemeralZooKeeperNodes||
> |testNotLeaderShouldNotCleanUpTheLeaderInformation| (but similar to 
> testNonLeaderCannotPublishLeaderInformation)|
> |testUnExpectedErrorForwarding||



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30342) Migrate ZooKeeperLeaderElectionTest

2023-06-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-30342.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

master: c4c633e843920bbf18b2ac2242ee48ada408f6ac

> Migrate ZooKeeperLeaderElectionTest
> ---
>
> Key: FLINK-30342
> URL: https://issues.apache.org/jira/browse/FLINK-30342
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> ||ZooKeeperLeaderElectionTest||ZooKeeperMultipleComponentLeaderElectionDriverTest||
> |testZooKeeperLeaderElectionRetrieval|testPublishLeaderInformation|
> |testZooKeeperReelection|testLeaderElectionWithMultipleDrivers|
> |testZooKeeperReelectionWithReplacement||
> |testLeaderShouldBeCorrectedWhenOverwritten|testNonLeaderCannotPublishLeaderInformation
>  (slightly different)|
> |testExceptionForwarding||
> |testEphemeralZooKeeperNodes||
> |testNotLeaderShouldNotCleanUpTheLeaderInformation| (but similar to 
> testNonLeaderCannotPublishLeaderInformation)|
> |testUnExpectedErrorForwarding||



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30342) Migrate ZooKeeperLeaderElectionTest

2023-06-21 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-30342:
--
Parent: (was: FLINK-30338)
Issue Type: Technical Debt  (was: Sub-task)

> Migrate ZooKeeperLeaderElectionTest
> ---
>
> Key: FLINK-30342
> URL: https://issues.apache.org/jira/browse/FLINK-30342
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> ||ZooKeeperLeaderElectionTest||ZooKeeperMultipleComponentLeaderElectionDriverTest||
> |testZooKeeperLeaderElectionRetrieval|testPublishLeaderInformation|
> |testZooKeeperReelection|testLeaderElectionWithMultipleDrivers|
> |testZooKeeperReelectionWithReplacement||
> |testLeaderShouldBeCorrectedWhenOverwritten|testNonLeaderCannotPublishLeaderInformation
>  (slightly different)|
> |testExceptionForwarding||
> |testEphemeralZooKeeperNodes||
> |testNotLeaderShouldNotCleanUpTheLeaderInformation| (but similar to 
> testNonLeaderCannotPublishLeaderInformation)|
> |testUnExpectedErrorForwarding||



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp merged pull request #22829: [FLINK-30342][test] Migrates ZooKeeperLeaderElectionTest to use the MultipleComponentLeaderElectionDriver interface

2023-06-21 Thread via GitHub


XComp merged PR #22829:
URL: https://github.com/apache/flink/pull/22829


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] CasperTDK commented on pull request #22393: FLINK-28171 [flink-kubernates] enable add appProtocol via the configuration and verify it is not overridden by Default port defintion

2023-06-21 Thread via GitHub


CasperTDK commented on PR #22393:
URL: https://github.com/apache/flink/pull/22393#issuecomment-1600537399

   Allowing myself to give this a bump. Any way to prioritize the review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on pull request #21774: [FLINK-28227][connectors] Migrate o.a.f.streaming.examples to the new Source API

2023-06-21 Thread via GitHub


afedulov commented on PR #21774:
URL: https://github.com/apache/flink/pull/21774#issuecomment-1600536032

   Hi @reswqa I actually just finished reimplementing the 
`SocketSource(Function)`  as the new Source and - this unblocks 
https://github.com/apache/flink/pull/20049 . I just need to go though some 
internal OSS contributions formality and will most likely complement this PR 
with last missing bit early next week. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-21 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735631#comment-17735631
 ] 

Fang Yong commented on FLINK-32370:
---

[~mapohl] I have created a new PR https://github.com/apache/flink/pull/22838 
for this. You can cherry-pick it as you need, and please feel free to ping me 
when there's any issue, thanks

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32407) Notify catalog listener for table events

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32407:
-

 Summary: Notify catalog listener for table events
 Key: FLINK-32407
 URL: https://issues.apache.org/jira/browse/FLINK-32407
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32406) Notify catalog listener for database events

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32406:
-

 Summary: Notify catalog listener for database events
 Key: FLINK-32406
 URL: https://issues.apache.org/jira/browse/FLINK-32406
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32405) Initialize catalog listener for CatalogManager

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32405:
-

 Summary: Initialize catalog listener for CatalogManager
 Key: FLINK-32405
 URL: https://issues.apache.org/jira/browse/FLINK-32405
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32404) Introduce catalog modification listener and factory interfaces

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32404:
-

 Summary: Introduce catalog modification listener and factory 
interfaces
 Key: FLINK-32404
 URL: https://issues.apache.org/jira/browse/FLINK-32404
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32403) Add database related operations in catalog manager

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32403:
-

 Summary: Add database related operations in catalog manager
 Key: FLINK-32403
 URL: https://issues.apache.org/jira/browse/FLINK-32403
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong


Add database operations in catalog manager for different sql operations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #22835: [hotfix] Remove local timeout for Hybrid and Tiered result partition test.

2023-06-21 Thread via GitHub


reswqa commented on PR #22835:
URL: https://github.com/apache/flink/pull/22835#issuecomment-1600495258

   Thanks @TanYuxin-tyx for the quick review, I have updated and force pushed 
this as it is only minor fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on pull request #22805: [FLINK-32365][orc]get orc table statistics in parallel

2023-06-21 Thread via GitHub


swuferhong commented on PR #22805:
URL: https://github.com/apache/flink/pull/22805#issuecomment-1600493415

   > @luoyuxia I find the code is called in multiple places. We make it 
configurable, we need change more moudles and we get more parameters. if we set 
parameter in hadoop config,both orc and parquet can use this parameter. Could 
you give me some idea?
   
   Hi, did you encounter the problem of slow reporting ORC statistics during 
using hive connector?  If that, I think you can add this parameter into 
`HiveOptions` as a Flink conf, and you need to set this flink conf into job 
conf in method `HiveSourceBuilder.setFlinkConfigurationToJobConf()`  (jobConf 
will be add into hadoopConf in hive source) . By doing this, you can get this 
parameter from `hadoopConf`, if this parameter not in `hadoopConf,` you can set 
it as `Runtime.getRuntime().availableProcessors()` as default. WDYT, @luoyuxia .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22835: [hotfix] Remove local timeout for Hybrid and Tiered result partition test.

2023-06-21 Thread via GitHub


reswqa commented on code in PR #22835:
URL: https://github.com/apache/flink/pull/22835#discussion_r1236686281


##
flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Rejected executions are ignored or logged in debug if the executor is {@link
+ * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, 
{@link
+ * RejectedExecutionException} is thrown.
+ */
+public class IgnoreShutdownRejectedExecutionHandler implements 
RejectedExecutionHandler {
+private final Logger logger;
+
+public IgnoreShutdownRejectedExecutionHandler() {
+this(null);
+}
+
+public IgnoreShutdownRejectedExecutionHandler(Logger logger) {
+this.logger = logger;
+}
+
+@Override
+public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+if (executor.isShutdown()) {
+if (logger != null) {
+logger.debug("Execution rejected because shutdown is in 
progress");

Review Comment:
   Done.



##
flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Rejected executions are ignored or logged in debug if the executor is {@link
+ * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, 
{@link
+ * RejectedExecutionException} is thrown.
+ */
+public class IgnoreShutdownRejectedExecutionHandler implements 
RejectedExecutionHandler {
+private final Logger logger;

Review Comment:
   Marked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32402) FLIP-294: Support Customized Catalog Modification Listener

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32402:
-

 Summary: FLIP-294: Support Customized Catalog Modification Listener
 Key: FLINK-32402
 URL: https://issues.apache.org/jira/browse/FLINK-32402
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Affects Versions: 1.18.0
Reporter: Fang Yong


Issue for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22838: [FLINK-32370][jdbc-driver] Debug error log for jdbc gateway e2e test

2023-06-21 Thread via GitHub


flinkbot commented on PR #22838:
URL: https://github.com/apache/flink/pull/22838#issuecomment-1600491747

   
   ## CI report:
   
   * bf99f60859b1a36224aaf0e5a539b14e1c9421fb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem

2023-06-21 Thread via GitHub


LadyForest commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1236679027


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -203,54 +193,75 @@ public void close() throws IOException {
 }
 }
 
-private void checkJarResources(List resourceUris) throws 
IOException {
-// only support register jar resource
-if (resourceUris.stream()
-.anyMatch(resourceUri -> 
!ResourceType.JAR.equals(resourceUri.getResourceType( {
-throw new ValidationException(
-String.format(
-"Only support to register jar resource, resource 
info:\n %s.",
-resourceUris.stream()
-.map(ResourceUri::getUri)
-.collect(Collectors.joining(",\n";
-}
+/** Check whether the {@link Path} exists. */
+public boolean exists(Path filePath) throws IOException {
+return filePath.getFileSystem().exists(filePath);
+}
 
-for (ResourceUri resourceUri : resourceUris) {
-checkJarPath(new Path(resourceUri.getUri()));
+/**
+ * Synchronize a file resource identified by the given {@link ResourceUri} 
with a local copy

Review Comment:
   > The path passed to resourceGenerator will be a local path received from 
the given {@link ResourceUri}.
   
   I think this description is not so accurate because if the given resource 
URI is `hdfs://foo/bar/plan.json`, then the local path passes to the resource 
generator should be `file://root-dir-forresource-manager/plan-uuid.json`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs opened a new pull request, #22838: [FLINK-32370][jdbc-driver] Debug error log for jdbc gateway e2e test

2023-06-21 Thread via GitHub


FangYongs opened a new pull request, #22838:
URL: https://github.com/apache/flink/pull/22838

   ## What is the purpose of the change
   
   This PR aims to debug error log for jdbc gateway e2e test
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem

2023-06-21 Thread via GitHub


LadyForest commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1236671845


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -296,4 +319,85 @@ private Path getResourceLocalPath(Path remotePath) {
 }
 return new Path(localResourceDir, fileNameWithUUID);
 }
+
+private void checkResources(List resourceUris, ResourceType 
expectedType)
+throws IOException {
+// check the resource type
+if (resourceUris.stream()
+.anyMatch(resourceUri -> expectedType != 
resourceUri.getResourceType())) {
+throw new ValidationException(
+String.format(
+"Only support to register %s resource, resource 
info:\n %s.",

Review Comment:
   This method is refactored to adapt both the `JAR` and `FILE` types. I agree 
with you that the message should be adapted as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem

2023-06-21 Thread via GitHub


LadyForest commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1236617192


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -89,52 +90,41 @@ public ResourceManager(ReadableConfig config, 
MutableURLClassLoader userClassLoa
  * register them into the {@link ResourceManager}.
  */
 public void registerJarResources(List resourceUris) throws 
IOException {
-// check jar resource before register
-checkJarResources(resourceUris);
-
-Map stagingResourceLocalURLs = new HashMap<>();
-for (ResourceUri resourceUri : resourceUris) {
-// check whether the resource has been registered
-if (resourceInfos.containsKey(resourceUri) && 
resourceInfos.get(resourceUri) != null) {
-LOG.info(
-"Resource [{}] has been registered, overwriting of 
registered resource is not supported "
-+ "in the current version, skipping.",
-resourceUri.getUri());
-continue;
-}
-
-// here can check whether the resource path is valid
-Path path = new Path(resourceUri.getUri());
-URL localUrl;
-// check resource scheme
-String scheme = StringUtils.lowerCase(path.toUri().getScheme());
-// download resource to local path firstly if in remote
-if (scheme != null && !FILE_SCHEME.equals(scheme)) {
-localUrl = downloadResource(path);
-} else {
-localUrl = getURLFromPath(path);
-// if the local jar resource is a relative path, here convert 
it to absolute path
-// before register
-resourceUri = new ResourceUri(ResourceType.JAR, 
localUrl.getPath());
-}
-
-// check the local jar file
-JarUtils.checkJarFile(localUrl);
-
-// add it to staging map
-stagingResourceLocalURLs.put(resourceUri, localUrl);
-}
-
-// register resource in batch
-stagingResourceLocalURLs.forEach(
-(resourceUri, url) -> {
-// jar resource need add to classloader
-userClassLoader.addURL(url);
-LOG.info("Added jar resource [{}] to class path.", url);
+registerResources(
+prepareStagingResources(
+resourceUris,
+ResourceType.JAR,
+true,
+url -> {
+try {
+JarUtils.checkJarFile(url);
+} catch (IOException e) {
+throw new ValidationException(e.getMessage(), 
e);

Review Comment:
   We're not registering a jar, actually..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem

2023-06-21 Thread via GitHub


LadyForest commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1236665067


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -731,38 +731,46 @@ public TableResultInternal executePlan(InternalPlan plan) 
{
 }
 
 private CompiledPlan compilePlanAndWrite(
-String filePath, boolean ifNotExists, Operation operation) {
-File file = Paths.get(filePath).toFile();
-if (file.exists()) {
-if (ifNotExists) {
-return loadPlan(PlanReference.fromFile(filePath));
+String pathString, boolean ignoreIfExists, Operation operation) {
+try {
+ResourceUri planResource = new ResourceUri(ResourceType.FILE, 
pathString);
+Path planPath = new Path(pathString);
+if (resourceManager.exists(planPath)) {
+if (ignoreIfExists) {
+return loadPlan(
+PlanReference.fromFile(
+
resourceManager.registerFileResource(planResource)));
+}
+
+if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) 
{
+throw new TableException(
+String.format(
+"Cannot overwrite the plan file '%s'. "
++ "Either manually remove the file 
or, "
++ "if you're debugging your job, "
++ "set the option '%s' to true.",
+pathString, 
TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
+}
 }
 
-if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) {
+CompiledPlan compiledPlan;
+if (operation instanceof StatementSetOperation) {
+compiledPlan = compilePlan(((StatementSetOperation) 
operation).getOperations());
+} else if (operation instanceof ModifyOperation) {
+compiledPlan = 
compilePlan(Collections.singletonList((ModifyOperation) operation));
+} else {
 throw new TableException(
-String.format(
-"Cannot overwrite the plan file '%s'. "
-+ "Either manually remove the file or, 
"
-+ "if you're debugging your job, "
-+ "set the option '%s' to true.",
-filePath, 
TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
+"Unsupported operation to compile: "
++ operation.getClass()
++ ". This is a bug, please file an issue.");
 }
-}
-
-CompiledPlan compiledPlan;
-if (operation instanceof StatementSetOperation) {
-compiledPlan = compilePlan(((StatementSetOperation) 
operation).getOperations());
-} else if (operation instanceof ModifyOperation) {
-compiledPlan = 
compilePlan(Collections.singletonList((ModifyOperation) operation));
-} else {
+resourceManager.syncFileResource(
+planResource, path -> compiledPlan.writeToFile(path, 
false));
+return compiledPlan;
+} catch (IOException e) {
 throw new TableException(
-"Unsupported operation to compile: "
-+ operation.getClass()
-+ ". This is a bug, please file an issue.");
+String.format("Cannot execute operation %s ", 
operation.asSummaryString()), e);

Review Comment:
   Here's my understanding. 
   
   The entire code block is surrounded by one `try... catch`. And the 
`IOException` might be thrown at any point
   - L#738(check file status) 
   - L#742(download the remote file to local)
   - L#767(sync the local file to remote). 
   
   As a result, I think `Fail to compile plan and write to xxx for xx` might be 
too general to fit all conditions. At the same time, this method is called by 
both `CompilePlanOperation` and `CompileAndExecutePlanOperation`. So I think 
the top-level exception message should differentiate it's a `COMPILE PLAN` 
statement or a `COMPILE AND EXECUTE PLAN` statement.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-21 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735615#comment-17735615
 ] 

Fang Yong commented on FLINK-32370:
---

Thanks [~mapohl], I think I miss the error log in JobManager. I will fix it

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22835: [hotfix] Remove local timeout for Hybrid and Tiered result partition test.

2023-06-21 Thread via GitHub


TanYuxin-tyx commented on code in PR #22835:
URL: https://github.com/apache/flink/pull/22835#discussion_r1236654517


##
flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Rejected executions are ignored or logged in debug if the executor is {@link
+ * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, 
{@link
+ * RejectedExecutionException} is thrown.
+ */
+public class IgnoreShutdownRejectedExecutionHandler implements 
RejectedExecutionHandler {
+private final Logger logger;
+
+public IgnoreShutdownRejectedExecutionHandler() {
+this(null);
+}
+
+public IgnoreShutdownRejectedExecutionHandler(Logger logger) {
+this.logger = logger;
+}
+
+@Override
+public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+if (executor.isShutdown()) {
+if (logger != null) {
+logger.debug("Execution rejected because shutdown is in 
progress");

Review Comment:
   "Execution is rejected" may be more appropriate.



##
flink-core/src/main/java/org/apache/flink/util/concurrent/IgnoreShutdownRejectedExecutionHandler.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Rejected executions are ignored or logged in debug if the executor is {@link
+ * java.util.concurrent.ThreadPoolExecutor#isShutdown shutdown}. Otherwise, 
{@link
+ * RejectedExecutionException} is thrown.
+ */
+public class IgnoreShutdownRejectedExecutionHandler implements 
RejectedExecutionHandler {
+private final Logger logger;

Review Comment:
   We'd better mark this logger as Nullable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem

2023-06-21 Thread via GitHub


LadyForest commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1236617192


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -89,52 +90,41 @@ public ResourceManager(ReadableConfig config, 
MutableURLClassLoader userClassLoa
  * register them into the {@link ResourceManager}.
  */
 public void registerJarResources(List resourceUris) throws 
IOException {
-// check jar resource before register
-checkJarResources(resourceUris);
-
-Map stagingResourceLocalURLs = new HashMap<>();
-for (ResourceUri resourceUri : resourceUris) {
-// check whether the resource has been registered
-if (resourceInfos.containsKey(resourceUri) && 
resourceInfos.get(resourceUri) != null) {
-LOG.info(
-"Resource [{}] has been registered, overwriting of 
registered resource is not supported "
-+ "in the current version, skipping.",
-resourceUri.getUri());
-continue;
-}
-
-// here can check whether the resource path is valid
-Path path = new Path(resourceUri.getUri());
-URL localUrl;
-// check resource scheme
-String scheme = StringUtils.lowerCase(path.toUri().getScheme());
-// download resource to local path firstly if in remote
-if (scheme != null && !FILE_SCHEME.equals(scheme)) {
-localUrl = downloadResource(path);
-} else {
-localUrl = getURLFromPath(path);
-// if the local jar resource is a relative path, here convert 
it to absolute path
-// before register
-resourceUri = new ResourceUri(ResourceType.JAR, 
localUrl.getPath());
-}
-
-// check the local jar file
-JarUtils.checkJarFile(localUrl);
-
-// add it to staging map
-stagingResourceLocalURLs.put(resourceUri, localUrl);
-}
-
-// register resource in batch
-stagingResourceLocalURLs.forEach(
-(resourceUri, url) -> {
-// jar resource need add to classloader
-userClassLoader.addURL(url);
-LOG.info("Added jar resource [{}] to class path.", url);
+registerResources(
+prepareStagingResources(
+resourceUris,
+ResourceType.JAR,
+true,
+url -> {
+try {
+JarUtils.checkJarFile(url);
+} catch (IOException e) {
+throw new ValidationException(e.getMessage(), 
e);

Review Comment:
   We're not registering a jar, actually..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22837: [FLINK-31957][docs][table] Add user story for configuring operator-level state TTL via compiled plan

2023-06-21 Thread via GitHub


flinkbot commented on PR #22837:
URL: https://github.com/apache/flink/pull/22837#issuecomment-1600417210

   
   ## CI report:
   
   * a861c95ba2fe65798002daaf9072e48df7b7dfff UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31957) Add documentation for the user story

2023-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31957:
---
Labels: pull-request-available  (was: )

> Add documentation for the user story
> 
>
> Key: FLINK-31957
> URL: https://issues.apache.org/jira/browse/FLINK-31957
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Add documentation on how to use compiled plan to configure operator-level 
> state TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest opened a new pull request, #22837: [FLINK-31957][docs][table] Add user story for configuring operator-level state TTL via compiled plan

2023-06-21 Thread via GitHub


LadyForest opened a new pull request, #22837:
URL: https://github.com/apache/flink/pull/22837

   ## What is the purpose of the change
   
   Add user story for FLIP-292
   
   
   ## Brief change log
   
   Add description on how to configure operator-level state TTL to `overview.md`
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? Docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] klion26 opened a new pull request, #658: [hotfix] Fix the wrong conflic resolve

2023-06-21 Thread via GitHub


klion26 opened a new pull request, #658:
URL: https://github.com/apache/flink-web/pull/658

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #244: Fix some bugs when preparing the release of 2.3.0

2023-06-21 Thread via GitHub


lindong28 commented on code in PR #244:
URL: https://github.com/apache/flink-ml/pull/244#discussion_r1236512750


##
tools/releasing/deploy_staging_jars.sh:
##
@@ -41,7 +41,11 @@ fi
 
 cd ${PROJECT_ROOT}
 
-echo "Deploying to repository.apache.org"
-${MVN} clean deploy -Papache-release -DskipTests 
-DretryFailedDeploymentCount=10 $CUSTOM_OPTIONS
+FLINK_VERSIONS=("1.15" "1.16" "1.17")

Review Comment:
   We probably don't want to modify this script every time we prepare a new 
Flink ML release.
   
   Would it be simpler to use environment variables to pass versions and 
document the usages of this environment variables in the release wiki?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 opened a new pull request, #244: Fix some bugs when preparing the release of 2.3.0

2023-06-21 Thread via GitHub


jiangxin369 opened a new pull request, #244:
URL: https://github.com/apache/flink-ml/pull/244

   
   
   ## What is the purpose of the change
   
   Fix some bugs when preparing the release of 2.3.0.
   
   ## Brief change log
   
 - Replace the version with perl to avoid different behaviors of sed among 
different OS
 - Deploy artifacts of each Flink version
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32401) KafkaSourceBuilder reset the 'auto.offset.reset'

2023-06-21 Thread KianChen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KianChen closed FLINK-32401.

Resolution: Fixed

> KafkaSourceBuilder reset the 'auto.offset.reset'
> 
>
> Key: FLINK-32401
> URL: https://issues.apache.org/jira/browse/FLINK-32401
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0
>Reporter: KianChen
>Priority: Major
> Fix For: 1.18.0, 1.17.0, 1.16.0, 1.15.0, 1.14.0
>
>
> KafkaSourceBuilde#parseAndSetRequiredProperties reset the 'auto.offset.reset' 
> to "earliest"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on pull request #22620: [FLINK-31413][hive] Change scope of flink-table-planner dependency from provided to test in Hive connector

2023-06-21 Thread via GitHub


luoyuxia commented on PR #22620:
URL: https://github.com/apache/flink/pull/22620#issuecomment-1600289821

   @LadyForest Thanks for reviewing. I have addressed your comments in the last 
three commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org