Re: [PR] [FLINK-20772][State] Make TtlValueState#update follow the description of interface about null values [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23928:
URL: https://github.com/apache/flink/pull/23928#issuecomment-1855354424

   
   ## CI report:
   
   * 8b7dc770d95714c15529007ee011cd1ca2768fa3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-20772][State] Make TtlValueState#update follow the description of interface about null values [flink]

2023-12-13 Thread via GitHub


Zakelly opened a new pull request, #23928:
URL: https://github.com/apache/flink/pull/23928

   ## What is the purpose of the change
   
   Currently, the TtlValueState#update will wrap the user value with timestamp 
and proxy request to the original value state. It does not check the null 
value, which violates the description of the base interface. This PR fixes this 
by enforcing null value check before wrapping.
   
   
   ## Brief change log
   
 - check the null value before wrapping in TtlValueState#update

   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added test case `testValueSetNull` in `TtlStateTestBase.java`, and make 
the value of `TtlValueStateTestContext` be `Long` to throw exception when 
serializing null.  
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **yes**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-20772) RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-20772:
---
Labels: auto-deprioritized-major auto-deprioritized-minor beginner 
pull-request-available  (was: auto-deprioritized-major auto-deprioritized-minor 
beginner)

> RocksDBValueState with TTL occurs NullPointerException when calling 
> update(null) method 
> 
>
> Key: FLINK-20772
> URL: https://issues.apache.org/jira/browse/FLINK-20772
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.2
> Environment: Flink version: 1.11.2
> Flink Cluster: Standalone cluster with 3 Job managers and Task managers on 
> CentOS 7
>Reporter: Seongbae Chang
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> beginner, pull-request-available
>
> h2. Problem
>  * I use ValueState for my custom trigger and set TTL for these ValueState in 
> RocksDB backend environment.
>  * I found an error when I used this code. I know that 
> ValueState.update(null) works equally to ValueState.clear() in general. 
> Unfortunately, this error occurs after using TTL
> {code:java}
> // My Code
> ctx.getPartitionedState(batchTotalSizeStateDesc).update(null);
> {code}
>  * I tested this in Flink 1.11.2, but I think it would be a problem in upper 
> versions.
>  * Plus, I'm a beginner. So, if there is any problem in this discussion 
> issue, please give me advice about that. And I'll fix it! 
> {code:java}
> // Error Stacktrace
> Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error 
> while adding data to RocksDB}
>   ... 12 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding 
> data to RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>   at 
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>   at .onProcessingTime(ActionBatchTimeTrigger.java:102)
>   at .onProcessingTime(ActionBatchTimeTrigger.java:29)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
>   ... 11 more
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69)
>   at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
>   at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>   ... 18 more
> {code}
>  
> h2. Reason
>  * It relates to RocksDBValueState with TTLValueState
>  * In RocksDBValueState(as well as other types of ValueState), 
> *.update(null)* has to be caught in if-clauses(null checking). However, it 
> skips the null checking and then tries to serialize the null value.
> {code:java}
> // 
> https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110
> @Override
> public void update(V value) { 
> if (value == null) { 
> clear(); 
> return; 
> }
>  
> try { 
> backend.db.put(columnFamily, writeOptions, 
> serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); 
> } catch (Exception e) { 
> throw new FlinkRuntimeException("Error while adding data to RocksDB", 
> e);  
> }
> }{code}
>  *  It is because that TtlValueState wraps the value(null) with the 
> LastAccessTime and makes the new TtlValue Object with the null value.
> {code:java}
> // 
> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51
> @Override

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-13 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1855334073

   @reswqa, thanks `TestingSinkWriterMetricGroup` works on all tested versions. 
I only faced the arch rules issue locally, so far. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23927:
URL: https://github.com/apache/flink/pull/23927#issuecomment-1855302232

   
   ## CI report:
   
   * 85ba4659f937d58dd510aa63009c3939bc085300 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33743) Support consuming multiple subpartitions on a single channel

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33743:
---
Labels: pull-request-available  (was: )

> Support consuming multiple subpartitions on a single channel
> 
>
> Key: FLINK-33743
> URL: https://issues.apache.org/jira/browse/FLINK-33743
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yuxin Tan
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> In Flink jobs that use the AdaptiveBatchScheduler and enable adaptive 
> parallelism, a downstream operator might consume multiple subpartitions from 
> an upstream operator. While downstream operators would create an InputChannel 
> for each upstream subpartition in Flink's current implementation, The many 
> InputChannels created in this situation may consume more memory resources 
> than needed, affecting the usability of Hybrid Shuffle and 
> AdaptiveBatchScheduler. In order to solve this problem, we plan to allow one 
> InputChannel to consume multiple subpartitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-13 Thread via GitHub


yunfengzhou-hub opened a new pull request, #23927:
URL: https://github.com/apache/flink/pull/23927

   ## What is the purpose of the change
   
   This pull request adds support for consuming multiple subpartitions in a 
single input channel.
   
   
   ## Brief change log
   
   - Separate the notion of subpartition and channel in terms of naming, 
signature and comments.
   - Union the output from multiple subpartitions with one 
ResultSubpartitionView and TierConsumerAgent
   - Control the partial record split logic when writing buffers into 
subpartition to avoid potential deadlocks
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Extended HybridShuffleITCase to cover cases when the feature proposed in 
this PR comes into effect.
   - Added unit tests for newly introduced options and classes
   - Manually verified situations that are not covered by e2e tests, like when 
tiered hybrid shuffle uses sort buffer.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? no need to document
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30535) Introduce TTL state based benchmarks

2023-12-13 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796585#comment-17796585
 ] 

Zakelly Lan commented on FLINK-30535:
-

[~yunta] Sure, I would like to do this.

> Introduce TTL state based benchmarks
> 
>
> Key: FLINK-30535
> URL: https://issues.apache.org/jira/browse/FLINK-30535
> Project: Flink
>  Issue Type: New Feature
>  Components: Benchmarks
>Reporter: Yun Tang
>Priority: Major
>
> This ticket is inspired by https://issues.apache.org/jira/browse/FLINK-30088 
> which wants to optimize the TTL state performance. I think it would be useful 
> to introduce state benchmarks based on TTL as Flink has some overhead to 
> support TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-13 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1413318476


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -110,7 +110,7 @@ public  Optional castInto(Class clazz) {
 }
 
 @Override
-public final void start(

Review Comment:
   There is no need to do this. Just override the onStart.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -142,7 +142,7 @@ protected void assertHasBeenStarted() {
 }
 
 @Override
-public final void close() {

Review Comment:
   ditto. Override the onClose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-13 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426214331


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
 @VisibleForTesting
 void newSlotsAreAvailable(Collection newSlots) {
+if (!globalSlotsViewable) {
+final Collection 
requestSlotMatches =
+requestSlotMatchingStrategy.matchRequestsAndSlots(
+newSlots, pendingRequests.values());
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+return;
+}
+
+receivedSlots.addAll(newSlots);
+if (receivedSlots.size() < pendingRequests.size()) {
+return;
+}
 final Collection 
requestSlotMatches =
 requestSlotMatchingStrategy.matchRequestsAndSlots(
 newSlots, pendingRequests.values());
+if (requestSlotMatches.size() == pendingRequests.size()) {
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+}
+receivedSlots.clear();

Review Comment:
   Should we only clear the pending slots after all the requirements are 
fulfilled?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);

Review Comment:
   In current implementation, the resource requirement will be sent if there is 
no further requriement change within the `slotRequestMaxInterval`. Another 
choice is to declare requirement at most once every `slotRequestMaxInterval`. 
TBH I'm not sure which one would be better. But I think that deserves a 
discussion.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -110,7 +110,7 @@ public  Optional castInto(Class clazz) {
 }
 
 @Override
-public final void start(

Review Comment:
   There is no need to do this. Just override the onStart.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {

Review Comment:
   I think we need to assert something went wrong instead of ignore this 
illegal statement.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || 

Re: [PR] [FLINK-33810][Runtime] Propagate RecordAttributes that contains isProcessingBacklog status [flink]

2023-12-13 Thread via GitHub


xintongsong commented on code in PR #23919:
URL: https://github.com/apache/flink/pull/23919#discussion_r1426213522


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesCombiner.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/** RecordAttributesValve combine RecordAttributes from different input 
channels. */
+public class RecordAttributesCombiner {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RecordAttributesCombiner.class);
+
+private final int numInputChannels;
+private final RecordAttributes[] allChannelRecordAttributes;
+private int nonBacklogChannelsCnt = 0;
+private RecordAttributes lastOutputAttributes = null;
+
+public RecordAttributesCombiner(int numInputChannels) {
+this.numInputChannels = numInputChannels;
+this.allChannelRecordAttributes = new 
RecordAttributes[numInputChannels];
+}
+
+public void inputRecordAttributes(
+RecordAttributes recordAttributes, int channelIdx, DataOutput 
output)
+throws Exception {
+LOG.debug("RecordAttributes: {} from channel idx: {}", 
recordAttributes, channelIdx);
+RecordAttributes lastChannelRecordAttributes = 
allChannelRecordAttributes[channelIdx];
+allChannelRecordAttributes[channelIdx] = recordAttributes;
+
+// skip if the input RecordAttributes of the input channel is the same 
as the last.
+if (recordAttributes.equals(lastChannelRecordAttributes)) {
+return;
+}
+
+final RecordAttributesBuilder builder =
+new RecordAttributesBuilder(Collections.emptyList());
+
+builder.setBacklog(combineIsBacklog(lastChannelRecordAttributes, 
recordAttributes));
+
+final RecordAttributes outputAttribute = builder.build();
+if (!outputAttribute.equals(lastOutputAttributes)) {
+output.emitRecordAttributes(outputAttribute);
+lastOutputAttributes = outputAttribute;
+}
+}
+
+/** If any of the input channels is backlog, the combined RecordAttributes 
is backlog. */
+private boolean combineIsBacklog(
+RecordAttributes lastRecordAttributes, RecordAttributes 
recordAttributes) {
+if (lastRecordAttributes == null
+|| lastRecordAttributes.isBacklog() != 
recordAttributes.isBacklog()) {
+if (lastRecordAttributes != null && recordAttributes.isBacklog()) {
+nonBacklogChannelsCnt -= 1;
+}
+if (!recordAttributes.isBacklog()) {
+nonBacklogChannelsCnt += 1;
+}
+}
+
+return nonBacklogChannelsCnt < numInputChannels;
+}

Review Comment:
   I'm trying to understand the behavior during job initialization. According 
to this method, this will be:
   
   * As soon as the first `RecordAttributes` is received, despite its 
`isBacklog` is true or false, the combiner will emit a `RecordAttributes` with 
`isBacklog` being false to the downstream, unless there's only one input 
channel. Is that correct?
   
   * Then the questions is, what happens before the first `RecordAttributes` is 
received? What is the initial status, and how should the operators behave? 
Would it be possible that the operators are initialized for one mode (e.g., 
non-backlog) and have to switch to another mode (e.g., backlog) before 
receiving any records? Or even worse, different operators might be initialized 
with inconsistent modes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:

[jira] [Closed] (FLINK-33781) Cleanup usage of deprecated org.apache.flink.table.api.TableConfig#ctor()

2023-12-13 Thread Jacky Lau (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacky Lau closed FLINK-33781.
-
Resolution: Fixed

> Cleanup usage of deprecated org.apache.flink.table.api.TableConfig#ctor()
> -
>
> Key: FLINK-33781
> URL: https://issues.apache.org/jira/browse/FLINK-33781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30535) Introduce TTL state based benchmarks

2023-12-13 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796562#comment-17796562
 ] 

Yun Tang commented on FLINK-30535:
--

[~Zakelly] would you like to take this ticket?

> Introduce TTL state based benchmarks
> 
>
> Key: FLINK-30535
> URL: https://issues.apache.org/jira/browse/FLINK-30535
> Project: Flink
>  Issue Type: New Feature
>  Components: Benchmarks
>Reporter: Yun Tang
>Priority: Major
>
> This ticket is inspired by https://issues.apache.org/jira/browse/FLINK-30088 
> which wants to optimize the TTL state performance. I think it would be useful 
> to introduce state benchmarks based on TTL as Flink has some overhead to 
> support TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32756][ha] Implement reusable ClientHAServices and reuse it when creating RestClusterClient [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23926:
URL: https://github.com/apache/flink/pull/23926#issuecomment-1855111482

   
   ## CI report:
   
   * da9d3da97bf03d2c1bb33880c5f56b1b96fce493 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32756) Reuse ClientHighAvailabilityServices when creating RestClusterClient

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32756:
---
Labels: pull-request-available  (was: )

> Reuse ClientHighAvailabilityServices when creating RestClusterClient
> 
>
> Key: FLINK-32756
> URL: https://issues.apache.org/jira/browse/FLINK-32756
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, every newly built RestClusterClient will create a new 
> ClientHighAvailabilityServices which is both unnecessary and resource 
> consuming. For example, each ZooKeeperClientHAServices contains a ZKClient 
> which holds a connection to ZK server and several related threads.
> By reusing ClientHighAvailabilityServices across multiple RestClusterClient 
> instances, we can save system resources(threads, connections), connection 
> establish time and leader retrieval time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-32756][ha] Implement reusable ClientHAServices and reuse it when creating RestClusterClient [flink]

2023-12-13 Thread via GitHub


xiangyuf opened a new pull request, #23926:
URL: https://github.com/apache/flink/pull/23926

   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30535) Introduce TTL state based benchmarks

2023-12-13 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796535#comment-17796535
 ] 

Zakelly Lan commented on FLINK-30535:
-

big +1 for this

> Introduce TTL state based benchmarks
> 
>
> Key: FLINK-30535
> URL: https://issues.apache.org/jira/browse/FLINK-30535
> Project: Flink
>  Issue Type: New Feature
>  Components: Benchmarks
>Reporter: Yun Tang
>Priority: Major
>
> This ticket is inspired by https://issues.apache.org/jira/browse/FLINK-30088 
> which wants to optimize the TTL state performance. I think it would be useful 
> to introduce state benchmarks based on TTL as Flink has some overhead to 
> support TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-13 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796533#comment-17796533
 ] 

Jim Hughes commented on FLINK-33756:


Hi [~jeyhunkarimov], nice analysis!  I did see that there were two pairs of 
Local-Global window aggregates when I very briefly looked initially; I totally 
agree that has to be part of the issue.  

Out of curiosity, how did you see the value coming out of the various windows?  
Was it println debugging or something else?

I like your explanation about the order of `processWatermark` and 
`processElement`; that explains the apparent flakiness.  

Looks like the different orderings is coming from the exchanging / hashing 
which is happening between the windows.  Perhaps thinking about how timestamps 
and the exchange operator will help us sort this out.  (Along with your note 
that we are "losing" the original timestamp in some sense.)

> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-13 Thread via GitHub


reswqa commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1855090968

   @schulzp, thank you for the investigation.
   
   1. I think `TestingSinkWriterMetricGroup` might help.
   2. Yes, this is a violations indeed. But I'm not sure if this will lead to 
CI failure, could you link the workflow failed as this? Anyway, I agree that 
this would go beyond the scope of this PR and I can fix this later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33449][table]Support array_contains_seq function [flink]

2023-12-13 Thread via GitHub


xuyangzhong commented on PR #23656:
URL: https://github.com/apache/flink/pull/23656#issuecomment-1855089363

   Hi, @leoyy0316 and @MartijnVisser . Let's talk in the jira instead of this 
pr. I shared my view there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33449) Support array_contains_seq function

2023-12-13 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796532#comment-17796532
 ] 

xuyang commented on FLINK-33449:


Hi, [~leoyy] thanks for driving this. IMO, this type of function is very 
useful, and I'm +1 to support this function.

However, the function names in some database or big data system are different. 
Take examples of the systems you mentioned as following.

1. Clickhouse `hasAll(set, subset)`

2. Trino `contains_sequence(x, seq)`

3. Starrocks `array_contains_all(arr1, arr2)`

The function names are not unified. In Flink, there has been a function named 
`ARRAY_CONTAINS(haystack, needle)` to check if the given element exists in an 
array. What about using `ARRAY_CONTAINS_ALL` to implement the function in this 
JIRA?

For a more formal process, maybe a discussion mail thread should be started in 
dev maillist.

Look for your thought. cc [~martijnvisser] 

> Support array_contains_seq function
> ---
>
> Key: FLINK-33449
> URL: https://issues.apache.org/jira/browse/FLINK-33449
> Project: Flink
>  Issue Type: New Feature
>Reporter: leo cheng
>Priority: Minor
>  Labels: pull-request-available
>
> support function array_contains_seq like trino contains_sequence
> trino: 
> https://trino.io/docs/current/functions/array.html?highlight=contains_sequence#contains_sequence



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2023-12-13 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-33819:
---
Summary: Support setting CompressType in RocksDBStateBackend  (was: Suppor 
setting CompressType in RocksDBStateBackend)

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-13 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796527#comment-17796527
 ] 

Yue Ma commented on FLINK-27681:


[~pnowojski] Thanks for your suggestion, I think this is a perfect solution. 
But it sounds like there is still a long way to go to implement this plan. Do 
we have any specific plans to do this?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33819) Suppor setting CompressType in RocksDBStateBackend

2023-12-13 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-33819:
---
Description: 
Currently, RocksDBStateBackend does not support setting the compression level, 
and Snappy is used for compression by default. But we have some scenarios where 
compression will use a lot of CPU resources. Turning off compression can 
significantly reduce CPU overhead. So we may need to support a parameter for 
users to set the CompressType of Rocksdb.

  !image-2023-12-14-11-35-22-306.png!

  was:
Currently, RocksDBStateBackend does not support setting the compression level, 
and Snappy is used for compression by default. But we have some scenarios where 
compression will use a lot of CPU resources. Turning off compression can 
significantly reduce CPU overhead. So we may need to support a parameter for 
users to set the CompressType of Rocksdb.

  
!https://internal-api-drive-stream.larkoffice.com/space/api/box/stream/download/preview/ALADbWTMGoD6WexSFGecz2Olnrb/?preview_type=16!


> Suppor setting CompressType in RocksDBStateBackend
> --
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33819) Suppor setting CompressType in RocksDBStateBackend

2023-12-13 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-33819:
---
Attachment: image-2023-12-14-11-35-22-306.png

> Suppor setting CompressType in RocksDBStateBackend
> --
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   
> !https://internal-api-drive-stream.larkoffice.com/space/api/box/stream/download/preview/ALADbWTMGoD6WexSFGecz2Olnrb/?preview_type=16!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33819) Suppor setting CompressType in RocksDBStateBackend

2023-12-13 Thread Yue Ma (Jira)
Yue Ma created FLINK-33819:
--

 Summary: Suppor setting CompressType in RocksDBStateBackend
 Key: FLINK-33819
 URL: https://issues.apache.org/jira/browse/FLINK-33819
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.18.0
Reporter: Yue Ma
 Fix For: 1.19.0
 Attachments: image-2023-12-14-11-32-32-968.png

Currently, RocksDBStateBackend does not support setting the compression level, 
and Snappy is used for compression by default. But we have some scenarios where 
compression will use a lot of CPU resources. Turning off compression can 
significantly reduce CPU overhead. So we may need to support a parameter for 
users to set the CompressType of Rocksdb.

  
!https://internal-api-drive-stream.larkoffice.com/space/api/box/stream/download/preview/ALADbWTMGoD6WexSFGecz2Olnrb/?preview_type=16!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-20772) RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

2023-12-13 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-20772:
---

Assignee: Zakelly Lan

> RocksDBValueState with TTL occurs NullPointerException when calling 
> update(null) method 
> 
>
> Key: FLINK-20772
> URL: https://issues.apache.org/jira/browse/FLINK-20772
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.2
> Environment: Flink version: 1.11.2
> Flink Cluster: Standalone cluster with 3 Job managers and Task managers on 
> CentOS 7
>Reporter: Seongbae Chang
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> beginner
>
> h2. Problem
>  * I use ValueState for my custom trigger and set TTL for these ValueState in 
> RocksDB backend environment.
>  * I found an error when I used this code. I know that 
> ValueState.update(null) works equally to ValueState.clear() in general. 
> Unfortunately, this error occurs after using TTL
> {code:java}
> // My Code
> ctx.getPartitionedState(batchTotalSizeStateDesc).update(null);
> {code}
>  * I tested this in Flink 1.11.2, but I think it would be a problem in upper 
> versions.
>  * Plus, I'm a beginner. So, if there is any problem in this discussion 
> issue, please give me advice about that. And I'll fix it! 
> {code:java}
> // Error Stacktrace
> Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error 
> while adding data to RocksDB}
>   ... 12 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding 
> data to RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>   at 
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>   at .onProcessingTime(ActionBatchTimeTrigger.java:102)
>   at .onProcessingTime(ActionBatchTimeTrigger.java:29)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
>   ... 11 more
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69)
>   at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
>   at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>   ... 18 more
> {code}
>  
> h2. Reason
>  * It relates to RocksDBValueState with TTLValueState
>  * In RocksDBValueState(as well as other types of ValueState), 
> *.update(null)* has to be caught in if-clauses(null checking). However, it 
> skips the null checking and then tries to serialize the null value.
> {code:java}
> // 
> https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110
> @Override
> public void update(V value) { 
> if (value == null) { 
> clear(); 
> return; 
> }
>  
> try { 
> backend.db.put(columnFamily, writeOptions, 
> serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); 
> } catch (Exception e) { 
> throw new FlinkRuntimeException("Error while adding data to RocksDB", 
> e);  
> }
> }{code}
>  *  It is because that TtlValueState wraps the value(null) with the 
> LastAccessTime and makes the new TtlValue Object with the null value.
> {code:java}
> // 
> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51
> @Override
> public void update(T value) throws IOException { 
> accessCallback.run(); 
> original.update(wrapWithTs(value));
> }
> {code}
> {code:java}
> // 
> 

Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-13 Thread via GitHub


liming30 commented on PR #23922:
URL: https://github.com/apache/flink/pull/23922#issuecomment-1855059115

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33588][Flink-Runtime] Fix NullArgumentException of getQuantile method in DescriptiveStatisticsHistogramStatistics [flink]

2023-12-13 Thread via GitHub


zhutong6688 commented on PR #23915:
URL: https://github.com/apache/flink/pull/23915#issuecomment-1855054142

   @JingGe Okay, I will resubmit it this evening and compress the submission


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1855044542

   > Hi @masteryhx , thanks for the contribution and update!
   > 
   > I have reviewed this PR and left some comments. Please take a look in your 
free time, thanks~
   > 
   > Also, `The Schema Compatibility` related code is Flink’s infrastructure 
and is used by many components. I didn't change and read related code before 
this PR, so I'm not professional in this area. I have reviewed, but it would be 
great to have more committers who are familiar with this area to help review. 
It will be helpful for the bug-free.
   
   Thanks a lot for the detailed review.
   We may have some key problems remaing in the comment to discuss.
   I'll update the pr after we are on the same way about these problems.
   Also ping @Zakelly @fredia here to help to review and discuss together, 
Thanks a lot for your time!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1426111500


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java:
##
@@ -1244,8 +1244,8 @@ void 
testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
 testKeyedValueStateUpgrade(
 initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
 .satisfiesAnyOf(
-e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
-e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
+e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
 }

Review Comment:
   As discussed in 
[#21635(comment)](https://github.com/apache/flink/pull/21635#discussion_r1426110489),
 I'd like to also add more state level test after adjusting in a seprate pr. 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1426110489


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(

Review Comment:
   Yes, you're right.
   These are all root classes call the `resolveSchemaCompatibility` in state 
module.
   So I'd like to prepare a seprate PR to adjust them as I mentioned before. 
   It should not affect the correctness of usage.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33754) Serialize QueryOperations into SQL

2023-12-13 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796517#comment-17796517
 ] 

Benchao Li commented on FLINK-33754:


[~dwysakowicz] Sorry to comment on a closed ticket. I saw you left out 
{{PlannerQueryOperation}} in the ticket, is this intentional? Or maybe you 
planned to add support for {{PlannerQueryOperation}} in another ticket? 

> Serialize QueryOperations into SQL
> --
>
> Key: FLINK-33754
> URL: https://issues.apache.org/jira/browse/FLINK-33754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33686][client] Reuse ClusterDescriptor in AbstractSessionClusterExecutor [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23925:
URL: https://github.com/apache/flink/pull/23925#issuecomment-1855022932

   
   ## CI report:
   
   * af8119eece05028b05a3e079a69c5359dee3f1e1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1426108037


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializerSnapshot oldSerializerSnapshot) {
+return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   Thanks for the detailed review.
   IIUC, Only If users has implemented one of the methods (old one or new one), 
it should work well in current PR.
   Sorry maybe I missed your point. The case you described is an nested 
serializer, right ?
   Whichever method `data1` and `data2` implement, the call should be onesided, 
otherwise the logic should be error even if we don't have this pr.
   
   The reason why we provide two default methods is that users could not use 
mixed migrated method and non-migrated method as you could saw the discussion 
[#21635(comment)](https://github.com/apache/flink/pull/21635#discussion_r1086625857)
   
   Just cc @Zakelly as we also talked about this problem offline. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33686) Reuse ClusterDescriptor in AbstractSessionClusterExecutor when executing jobs on the same cluster

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33686:
---
Labels: pull-request-available  (was: )

> Reuse ClusterDescriptor in AbstractSessionClusterExecutor when executing jobs 
> on the same cluster
> -
>
> Key: FLINK-33686
> URL: https://issues.apache.org/jira/browse/FLINK-33686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Multiple `RemoteExecutor` instances can reuse the same 
> `StandaloneClusterDescriptor` when executing jobs to a same running cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33686][client] Reuse ClusterDescriptor in AbstractSessionClusterExecutor [flink]

2023-12-13 Thread via GitHub


xiangyuf opened a new pull request, #23925:
URL: https://github.com/apache/flink/pull/23925

   
   ## What is the purpose of the change
   
   Reuse ClusterDescriptor in AbstractSessionClusterExecutor
   
   ## Brief change log
   
 - Add `cachedClusterDescriptors` in `AbstractSessionClusterExecutor`
   
   ## Verifying this change
   
 - Added Unit Test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33809] HiveCatalog load hivemetastore-site.xml [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23924:
URL: https://github.com/apache/flink/pull/23924#issuecomment-1855007709

   
   ## CI report:
   
   * a6d6662bc551b60b528c863217219379fa415b0d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #23253:
URL: https://github.com/apache/flink/pull/23253#discussion_r1426092346


##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##
@@ -164,6 +164,14 @@ public class CliFrontendParser {
 false,
 "Defines whether to trigger this checkpoint as a full 
one.");
 
+static final Option SAVEPOINT_DETACH_OPTION =

Review Comment:
   1. How about just using 'detached' here aligned with DETACHED_OPTION ? It 
should not be conflict with other one because it will only be checked in 
savepoint and stop-with-savepoint command, right ? It will help us migrating in 
the future.
   2. I saw this could also be used for manual checkpoint. Of course, we could 
support this in other ticket/pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33809) HiveCatalog load hivemetastore.xml

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33809:
---
Labels: pull-request-available  (was: )

> HiveCatalog load hivemetastore.xml
> --
>
> Key: FLINK-33809
> URL: https://issues.apache.org/jira/browse/FLINK-33809
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
>Reporter: Bo Cui
>Priority: Major
>  Labels: pull-request-available
>
> [https://github.com/apache/flink/blob/3532f59cb9484a67e1b441e2875a26eb3691221f/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java#L266]
> Currently, hivecatalog only load hive-site.xml, we need to merge 
> hivemetastore-site.xml into hive-site.xml, which is inconvenient.
> we can enhance hivecatalog, it can load hivemetastore-site.xml and suppors 
> 'properties.*'



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33809] HiveCatalog load hivemetastore-site.xml [flink]

2023-12-13 Thread via GitHub


cuibo01 opened a new pull request, #23924:
URL: https://github.com/apache/flink/pull/23924

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-13 Thread via GitHub


masteryhx commented on PR #23253:
URL: https://github.com/apache/flink/pull/23253#issuecomment-1854996169

   > @masteryhx @Zakelly Following your constructive comments, I have 
rearranged the commits and rebased to the master branch, please have a look, 
many thanks~
   
   Thanks for the update.
   I saw some minor comments haven't been resolved. 
   Do you have any other concern ? We could discuss in every comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #23253:
URL: https://github.com/apache/flink/pull/23253#discussion_r1426080866


##
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##
@@ -188,6 +207,21 @@ CompletableFuture triggerSavepoint(
  */
 CompletableFuture triggerCheckpoint(JobID jobId, CheckpointType 
checkpointType);
 
+/**
+ * Triggers a detached savepoint for the job identified by the job id. The 
savepoint will be
+ * written to the given savepoint directory, or {@link
+ * 
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it 
is null.
+ * Notice that: detach savepoint will return with a savepoint trigger id 
instead of the path
+ * future, that means the client will return very quickly.
+ *
+ * @param jobId job id
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param formatType a binary format of the savepoint
+ * @return The savepoint trigger id
+ */
+CompletableFuture triggerDetachSavepoint(

Review Comment:
   Sorry for misleading.
   I just talked about naming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33723) Disallow triggering incremental checkpoint explicitly from REST API

2023-12-13 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33723.
--
Resolution: Fixed

merged 07d159bf into master

> Disallow triggering incremental checkpoint explicitly from REST API
> ---
>
> Key: FLINK-33723
> URL: https://issues.apache.org/jira/browse/FLINK-33723
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, when a job is configured to run with incremental checkpoint 
> disabled, user manually triggers an incremental checkpoint actually 
> triggering a full checkpoint. That is because the files from full checkpoint 
> cannot be shared with an incremental checkpoint. So it is better to remove 
> the "INCREMENTAL" option in triggering checkpoint from REST API to avoid 
> misunderstanding.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33723] Disallow triggering incremental checkpoint explicitly from REST API [flink]

2023-12-13 Thread via GitHub


masteryhx closed pull request #23853: [FLINK-33723] Disallow triggering 
incremental checkpoint explicitly from REST API
URL: https://github.com/apache/flink/pull/23853


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33779][table] Cleanup usage of deprecated BaseExpressions#cast [flink]

2023-12-13 Thread via GitHub


liuyongvs commented on PR #23895:
URL: https://github.com/apache/flink/pull/23895#issuecomment-1854975748

   hi @lsyldliu @xuyangzhong will you also help review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33778][table] Cleanup usage of deprecated TableConfig#setIdleS… [flink]

2023-12-13 Thread via GitHub


liuyongvs commented on PR #23894:
URL: https://github.com/apache/flink/pull/23894#issuecomment-1854976683

   hi @lsyldliu @xuyangzhong  will you also help review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33500][Runtime] Run storing the JobGraph an asynchronous operation [flink]

2023-12-13 Thread via GitHub


zhengzhili333 commented on code in PR #23880:
URL: https://github.com/apache/flink/pull/23880#discussion_r1426063623


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java:
##
@@ -37,6 +38,18 @@ public interface JobGraphWriter extends 
LocallyCleanableResource, GloballyCleana
  */
 void putJobGraph(JobGraph jobGraph) throws Exception;
 
+/**
+ * Adds the {@link JobGraph} instance and have write operations performed 
asynchronously in
+ * ioExecutor of Dispatcher
+ *
+ * @param jobGraph
+ * @param ioExecutor
+ * @return
+ * @throws Exception
+ */
+CompletableFuture putJobGraphAsync(JobGraph jobGraph, 
Optional ioExecutor)

Review Comment:
   Thank you for your reminder.Changing the interface violates the open/close 
principle, but it is not necessary to maintain two interfaces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33781) Cleanup usage of deprecated org.apache.flink.table.api.TableConfig#ctor()

2023-12-13 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796507#comment-17796507
 ] 

dalongliu commented on FLINK-33781:
---

Merged to master: bb02698dee078446500d94246acf6b162a2b7924

> Cleanup usage of deprecated org.apache.flink.table.api.TableConfig#ctor()
> -
>
> Key: FLINK-33781
> URL: https://issues.apache.org/jira/browse/FLINK-33781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33781][table] Cleanup usage of deprecated TableConfig#ctor [flink]

2023-12-13 Thread via GitHub


lsyldliu merged PR #23897:
URL: https://github.com/apache/flink/pull/23897


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33818] Implement restore tests for WindowDeduplicate node [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23923:
URL: https://github.com/apache/flink/pull/23923#issuecomment-1854828911

   
   ## CI report:
   
   * e7fdab8bf875d961bf1e29830864d9ee5a0f455f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33818) Implement restore tests for WindowDeduplicate node

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33818:
---
Labels: pull-request-available  (was: )

> Implement restore tests for WindowDeduplicate node
> --
>
> Key: FLINK-33818
> URL: https://issues.apache.org/jira/browse/FLINK-33818
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Bonnie Varghese
>Assignee: Bonnie Varghese
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33818] Implement restore tests for WindowDeduplicate node [flink]

2023-12-13 Thread via GitHub


bvarghese1 opened a new pull request, #23923:
URL: https://github.com/apache/flink/pull/23923

   
   
   ## What is the purpose of the change
   
   *Add restore tests for WindowDeduplicate node*
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
   - Added restore tests for WindowDeduplicate node which verifies the 
generated compiled plan with the saved compiled plan
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33818) Implement restore tests for WindowDeduplicate node

2023-12-13 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-33818:
---

 Summary: Implement restore tests for WindowDeduplicate node
 Key: FLINK-33818
 URL: https://issues.apache.org/jira/browse/FLINK-33818
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bonnie Varghese
Assignee: Bonnie Varghese






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3

2023-12-13 Thread Sai Sharath Dandi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sai Sharath Dandi updated FLINK-33817:
--
Description: 
*Background*

 

The current Protobuf format 
[implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java]
 always sets ReadDefaultValues=False when using Proto3 version. This can cause 
severe performance degradation for large Protobuf schemas with OneOf fields as 
the entire generated code needs to be executed during deserialization even when 
certain fields are not present in the data to be deserialized and all the 
subsequent nested Fields can be skipped. Proto3 supports hasXXX() methods for 
checking field presence for non primitive types since Proto version 
[3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In the 
internal performance benchmarks in our company, we've seen almost 10x 
difference in performance when allowing to set ReadDefaultValues=False with 
proto3 version

 

*Solution*

 

Support using ReadDefaultValues=False when using Proto3 version. We need to be 
careful to check for field presence only on non-primitive types if 
ReadDefaultValues is false and version used is Proto3

  was:
*Background*

 

The current Protobuf format 
[implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java]
 always sets ReadDefaultValues=False when using Proto3 version. This can cause 
severe performance degradation for large Protobuf schemas with OneOf fields as 
the entire generated code needs to be executed during deserialization even when 
certain fields are not present in the data to be deserialized and all the 
subsequent nested Fields can be skipped. Proto3 supports hasXXX() methods for 
checking field presence for non primitive types since Proto version 
[3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In 
internal benchmarks in our company, we've seen almost 10x difference in 
performance when allowing to set ReadDefaultValues=False with proto3 version

 

*Solution*

 

Support using ReadDefaultValues=False when using Proto3 version. We need to be 
careful to check for field presence only on non-primitive types if 
ReadDefaultValues is false and version used is Proto3


> Allow ReadDefaultValues = False for non primitive types on Proto3
> -
>
> Key: FLINK-33817
> URL: https://issues.apache.org/jira/browse/FLINK-33817
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Priority: Major
>
> *Background*
>  
> The current Protobuf format 
> [implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java]
>  always sets ReadDefaultValues=False when using Proto3 version. This can 
> cause severe performance degradation for large Protobuf schemas with OneOf 
> fields as the entire generated code needs to be executed during 
> deserialization even when certain fields are not present in the data to be 
> deserialized and all the subsequent nested Fields can be skipped. Proto3 
> supports hasXXX() methods for checking field presence for non primitive types 
> since Proto version 
> [3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In 
> the internal performance benchmarks in our company, we've seen almost 10x 
> difference in performance when allowing to set ReadDefaultValues=False with 
> proto3 version
>  
> *Solution*
>  
> Support using ReadDefaultValues=False when using Proto3 version. We need to 
> be careful to check for field presence only on non-primitive types if 
> ReadDefaultValues is false and version used is Proto3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3

2023-12-13 Thread Sai Sharath Dandi (Jira)
Sai Sharath Dandi created FLINK-33817:
-

 Summary: Allow ReadDefaultValues = False for non primitive types 
on Proto3
 Key: FLINK-33817
 URL: https://issues.apache.org/jira/browse/FLINK-33817
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.18.0
Reporter: Sai Sharath Dandi


*Background*

 

The current Protobuf format 
[implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java]
 always sets ReadDefaultValues=False when using Proto3 version. This can cause 
severe performance degradation for large Protobuf schemas with OneOf fields as 
the entire generated code needs to be executed during deserialization even when 
certain fields are not present in the data to be deserialized and all the 
subsequent nested Fields can be skipped. Proto3 supports hasXXX() methods for 
checking field presence for non primitive types since Proto version 
[3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In 
internal benchmarks in our company, we've seen almost 10x difference in 
performance when allowing to set ReadDefaultValues=False with proto3 version

 

*Solution*

 

Support using ReadDefaultValues=False when using Proto3 version. We need to be 
careful to check for field presence only on non-primitive types if 
ReadDefaultValues is false and version used is Proto3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425934480


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   thanks for looking in this
   yep, you are right since `keyNames` has info about order and we can reuse it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33641][test] Suppress the DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test failures [flink]

2023-12-13 Thread via GitHub


snuyanzin commented on PR #23914:
URL: https://github.com/apache/flink/pull/23914#issuecomment-1854769170

   @Jiabao-Sun 
   >I tend to avoid too much CI failure through this PR until we find the root 
cause.
   >WDYT 樂️
   
   I put commit with WA I've described above at 
https://github.com/apache/flink/pull/23917
   and it is green
   
   moreover I've scheduled it both in flink ci and my own ci more than 5 times 
in total and there is no any failure...
   
   If there is no objections we can merge it  to avoid too much ci failures and 
continue looking for more optimal solution in a calmer way
   
   
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2678=results
   
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2676=results
   
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2675=results
   
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2679=results
   
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2681=results
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55493=results
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55487=results
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33804) Add Option to disable showing metrics in JobMananger UI

2023-12-13 Thread Lu Niu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796474#comment-17796474
 ] 

Lu Niu commented on FLINK-33804:


the whitelist will be a subset of metrics listed here 
[https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java#L4]
 

> Add Option to disable showing metrics in JobMananger UI
> ---
>
> Key: FLINK-33804
> URL: https://issues.apache.org/jira/browse/FLINK-33804
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Lu Niu
>Priority: Major
>
> Flink allows users to view metric in JobMananger UI. However there are 2 
> problems we found:
>  # The JobManager is required to aggregate metrics from all task managers. 
> When the metric cardinality is quite high, this process can trigger a 
> JobManager Full GC and slow response time.
>  # Flink user cases in prod usually have their own dashboard to view metrics. 
> so this feature sometimes is not useful.
> In light of this, we propose to add option to disable this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [ FLINK-33603][FileSystems] shade guava in gs-fs filesystem [flink]

2023-12-13 Thread via GitHub


JingGe commented on PR #23489:
URL: https://github.com/apache/flink/pull/23489#issuecomment-1854677694

   Hi folks, thanks for driving it.
   
   @MartijnVisser my understanding is that your version won't work, since there 
still a `guava 27.0-jre` in the classpath.
   
   @singhravidutt: https://github.com/apache/flink/pull/23920 from 
@MartijnVisser actually shows the right direction. We don't need to exclude 
guava in `google-cloud-storage` and then build an explicit guava dependency. 
Afaik your version should also work but add extra effort of building guava 
dependency instead of leverage the dependency `google-cloud-storage` already 
has, i.e. if the next version of `google-cloud-storage` needs a new guava 
version, we have to change the guava dependency accordingly again and only a 
few people know it.
   
   I would suggest a solution combine your thoughts plus a small suggestion 
from me:
   
   1. exclude guava in `flink-fs-hadoop-shaded ` thought from @singhravidutt 
   2. remove the new guava dependency created in this pom, thought from 
@MartijnVisser, 1+2 -> make sure there is no guava in the classpath
   3. remove the exclude guava in `google-cloud-storage`, i.e. enable the gcs 
built-in guava dependency -> make sure `google-cloud-storage` has the right 
guava version in the classpath, thought from @MartijnVisser 
   4. remove the exclude guava in `org.apache.flink:flink-fs-hadoop-shaded` as 
I commented above. 
   
   Please correct me if I am wrong. Look forward to your thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33804) Add Option to disable showing metrics in JobMananger UI

2023-12-13 Thread Lu Niu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796455#comment-17796455
 ] 

Lu Niu commented on FLINK-33804:


The problem we want to address:

In default setting, the JobManager is required to aggregate metrics from all 
task managers to power the metrics in UI. When the metric cardinality is quite 
high, this process can trigger a JobManager Full GC and slow response time. 

There are several options:
Option 1: The issue at hand can be mitigated by setting 
metrics.fetcher.update-interval=0. However, a problem arises in the JobManager 
UI where metrics like "Byte Received" keep loading indefinitely. This can lead 
to confusion for users.

Option 2: To address this, we can introduce a whitelist of metrics. 
Additionally, we can add an option that, when enabled, will only allow the 
selected metrics to report to the JobManager. This will ensure that the UI, 
including the overview page and subtask page, continues to function properly.

Option 3: An alternative approach is to follow a similar path as in option 2. 
However, instead of introducing a new feature flag, we can repurpose the 
existing metrics.fetcher.update-interval flag. When 
metrics.fetcher.update-interval is set to 0, the whitelist feature will be 
automatically activated.
 
 
 

> Add Option to disable showing metrics in JobMananger UI
> ---
>
> Key: FLINK-33804
> URL: https://issues.apache.org/jira/browse/FLINK-33804
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Lu Niu
>Priority: Major
>
> Flink allows users to view metric in JobMananger UI. However there are 2 
> problems we found:
>  # The JobManager is required to aggregate metrics from all task managers. 
> When the metric cardinality is quite high, this process can trigger a 
> JobManager Full GC and slow response time.
>  # Flink user cases in prod usually have their own dashboard to view metrics. 
> so this feature sometimes is not useful.
> In light of this, we propose to add option to disable this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33804) Add Option to disable showing metrics in JobMananger UI

2023-12-13 Thread Lu Niu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796455#comment-17796455
 ] 

Lu Niu edited comment on FLINK-33804 at 12/13/23 8:36 PM:
--

[~martijnvisser] 
The problem we want to address:

In default setting, the JobManager is required to aggregate metrics from all 
task managers to power the metrics in UI. When the metric cardinality is quite 
high, this process can trigger a JobManager Full GC and slow response time. 

There are several options:
Option 1: The issue at hand can be mitigated by setting 
metrics.fetcher.update-interval=0. However, a problem arises in the JobManager 
UI where metrics like "Byte Received" keep loading indefinitely. This can lead 
to confusion for users.

Option 2: To address this, we can introduce a whitelist of metrics. 
Additionally, we can add an option that, when enabled, will only allow the 
selected metrics to report to the JobManager. This will ensure that the UI, 
including the overview page and subtask page, continues to function properly.

Option 3: An alternative approach is to follow a similar path as in option 2. 
However, instead of introducing a new feature flag, we can repurpose the 
existing metrics.fetcher.update-interval flag. When 
metrics.fetcher.update-interval is set to 0, the whitelist feature will be 
automatically activated.
 
 
 


was (Author: qqibrow):
The problem we want to address:

In default setting, the JobManager is required to aggregate metrics from all 
task managers to power the metrics in UI. When the metric cardinality is quite 
high, this process can trigger a JobManager Full GC and slow response time. 

There are several options:
Option 1: The issue at hand can be mitigated by setting 
metrics.fetcher.update-interval=0. However, a problem arises in the JobManager 
UI where metrics like "Byte Received" keep loading indefinitely. This can lead 
to confusion for users.

Option 2: To address this, we can introduce a whitelist of metrics. 
Additionally, we can add an option that, when enabled, will only allow the 
selected metrics to report to the JobManager. This will ensure that the UI, 
including the overview page and subtask page, continues to function properly.

Option 3: An alternative approach is to follow a similar path as in option 2. 
However, instead of introducing a new feature flag, we can repurpose the 
existing metrics.fetcher.update-interval flag. When 
metrics.fetcher.update-interval is set to 0, the whitelist feature will be 
automatically activated.
 
 
 

> Add Option to disable showing metrics in JobMananger UI
> ---
>
> Key: FLINK-33804
> URL: https://issues.apache.org/jira/browse/FLINK-33804
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Lu Niu
>Priority: Major
>
> Flink allows users to view metric in JobMananger UI. However there are 2 
> problems we found:
>  # The JobManager is required to aggregate metrics from all task managers. 
> When the metric cardinality is quite high, this process can trigger a 
> JobManager Full GC and slow response time.
>  # Flink user cases in prod usually have their own dashboard to view metrics. 
> so this feature sometimes is not useful.
> In light of this, we propose to add option to disable this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [ FLINK-33603][FileSystems] shade guava in gs-fs filesystem [flink]

2023-12-13 Thread via GitHub


JingGe commented on code in PR #23489:
URL: https://github.com/apache/flink/pull/23489#discussion_r1425851930


##
flink-filesystems/flink-gs-fs-hadoop/pom.xml:
##
@@ -188,6 +212,29 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-fs-hadoop-shaded
+   


Review Comment:
   IIUC, we don't need to exclude shaded guava classes since it will not 
disturb if a right guava version dependency is in the classpath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-13 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796391#comment-17796391
 ] 

Piotr Nowojski commented on FLINK-33734:


{quote}

Yes, since multiple subtasks will reuse unaligned checkpoint files after the 
ISSUE is completed, merging handles between multiple subtasks can further 
reduce redundant data. But this may require changing the way the checkpoint 
metadata objects are organized. And this optimization is constant level, but 
merging handles within subtask can reduce the number of file paths from n^2 to 
n. So I'm not sure if merging handles between subtasks is worth it at this 
stage.

{quote}

+1 to merging within subtask first. I doubt that we will need to merge state 
handles across subtasks, as there is already tons of communication between JM 
and each subtask, but we can evaluate it later if it proves to be still an 
issue.

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


AncyRominus closed pull request #732: [FLINK-33632] Adding custom flink mutator
URL: https://github.com/apache/flink-kubernetes-operator/pull/732


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425570925


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   I did a more sophisticated query with more than one resolvedPredicate and 
multiple pushdownParams in one of them.   
   
![image](https://github.com/apache/flink-connector-jdbc/assets/39792797/8a360200-61cc-4217-ab21-13eacf7e1b1e)
   
   I am thinking I need to replace the ? characters with the pushdownParams in 
order, but avoiding any question marks that might be in the keynames i.e. the 
quotes. @snuyanzin I assume this is what you mean by "take into account key 
names"

   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425570925


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   I did a more sophisticated query with more than one resolvedPredicate and 
multiple pushdownParams in one of them.   
   
![image](https://github.com/apache/flink-connector-jdbc/assets/39792797/8a360200-61cc-4217-ab21-13eacf7e1b1e)
   
   I am thinking I need to replace the ? characters with the pushdownParams in 
order. @snuyanzin is this what you meant by "take into account key names"?  

   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-13 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796372#comment-17796372
 ] 

Jeyhun Karimov commented on FLINK-33756:


Hi [~jhughes] I had a chance to look at the issue. I share my findings below.

So, when we enable 
{{OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED}}, the 
following optimized execution plan is produced:


{code}
Sink(table=[default_catalog.default_database.MySink], fields=[name, $f1, $f2, 
window_start, window_end])
+- GlobalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_end=[$window_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, MAX(max$0) AS $f1, $SUM0(sum$1) AS $f2, start('w$) AS 
window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name]])
  +- LocalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[15 
s], step=[5 s], offset=[1 s])], select=[name, MAX($f5_0) AS max$0, $SUM0($f6_0) 
AS sum$1, slice_end('w$) AS $window_end])
 +- Calc(select=[name, window_start, window_end, $f5, $f6, $f3 AS 
$f5_0, $f4 AS $f6_0])
+- GlobalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(slice_end=[$slice_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(max$0) AS $f3, COUNT(distinct$0 count$1) AS 
$f4, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name, $f5, $f6]])
  +- LocalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(time_col=[rowtime], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(double) FILTER $g_1 AS max$0, 
COUNT(distinct$0 int) FILTER $g_2 AS count$1, DISTINCT(int) AS distinct$0, 
slice_end('w$) AS $slice_end])
 +- Calc(select=[name, double, int, $f5, $f6, ($e = 1) AS 
$g_1, ($e = 2) AS $g_2, rowtime])
+- Expand(projects=[{name, double, int, $f5, null AS 
$f6, 1 AS $e, rowtime}, {name, double, int, null AS $f5, $f6, 2 AS $e, 
rowtime}])
   +- Calc(select=[name, double, int, 
MOD(HASH_CODE(double), 1024) AS $f5, MOD(HASH_CODE(int), 1024) AS $f6, 
Reinterpret(TO_TIMESTAMP(ts)) AS rowtime])
  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[int, double, name, ts], metadata=[], 
watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[int, double, name, ts])

{code}


As we see, there are two window operators (both with {{Local-Global 
optimization}} ). (Just to remember that the missing record is  - "+I[b, 3.0, 
1, 2020-10-10T00:00:31, 2020-10-10T00:00:41]")

As we see from the schema of the second {{LocalWindowAggregate}}, it uses 
{{window_start}} and {{window_end}} to calculate {{CUMULATE}} windows. At this 
point (at the second {{LocalWindowAggregate}}), our "missing" record becomes 
like "+I(b,2020-10-10T00:00:31,2020-10-10T00:00:41,0,null,3.0,0)". So, at this 
point, we already lost the original event time of the record. 


As a result, the flaky behaviour happens because of the calling order between 
{{SlicingWindowOperator::processWatermark}}->{{AbstractWindowAggProcessor::advanceProgress}}
 and {{SlicingWindowOperator::processElement}}:

- If the {{processWatermark}} is called before the {{processElement}}, then the 
{{currentProgress}} is updated to {{1602288041000}}. In this case, once the 
{{processElement}} is called afterwards, it considers the window is already 
fired and drops the element

- If the {{processElement}} is called before the {{processWatermark}}, then the 
record processed as expected. 

Is this something expected? WDYT?


> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


gyfora commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1425609941


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);

Review Comment:
   Actually I am not sure whether we will get the warn logs if we don't delete 
it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33632) Add custom mutator plugin

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33632:
---
Labels: pull-request-available  (was: )

> Add custom mutator plugin
> -
>
> Key: FLINK-33632
> URL: https://issues.apache.org/jira/browse/FLINK-33632
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Not a Priority
>  Labels: pull-request-available
>
> Currently users have the ability to provide custom validators to the 
> operator. It would be great if we followed the same pattern to provide custom 
> mutators



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


AncyRominus opened a new pull request, #733:
URL: https://github.com/apache/flink-kubernetes-operator/pull/733

   
   
   ## What is the purpose of the change
   
   Adding Custom Flink Mutator
   
   ## Brief change log
   
   No change in logs
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33686) Reuse ClusterDescriptor in AbstractSessionClusterExecutor when executing jobs on the same cluster

2023-12-13 Thread xiangyu feng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiangyu feng updated FLINK-33686:
-
Summary: Reuse ClusterDescriptor in AbstractSessionClusterExecutor when 
executing jobs on the same cluster  (was: Reuse StandaloneClusterDescriptor in 
RemoteExecutor when executing jobs on the same cluster)

> Reuse ClusterDescriptor in AbstractSessionClusterExecutor when executing jobs 
> on the same cluster
> -
>
> Key: FLINK-33686
> URL: https://issues.apache.org/jira/browse/FLINK-33686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: xiangyu feng
>Priority: Major
>
> Multiple `RemoteExecutor` instances can reuse the same 
> `StandaloneClusterDescriptor` when executing jobs to a same running cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33807] Update JUnit to 5.10.1 [flink]

2023-12-13 Thread via GitHub


snuyanzin commented on PR #23917:
URL: https://github.com/apache/flink/pull/23917#issuecomment-1854237160

   interesting, retry of successful build does not work...
   will sumbit same commit with different comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425570925


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   I did a more sophisticated query with more than one resolvedPredicate and 
multiple pushdownParams in one of them.   
   
![image](https://github.com/apache/flink-connector-jdbc/assets/39792797/8a360200-61cc-4217-ab21-13eacf7e1b1e)
   
   I am thinking I need to replace the ? characters with the pushdownParams in 
order. @snuyanzin is this what you meant by "account key names"?  

   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425382471


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   @snuyanzin I am reading JdbcFilterPushdownPreparedStatementVisitor and it 
looks like it only attempts to pushdown simple join expressions; in which case 
this code is ok (as that was my assumption). I will test to check my assumption 
is correct that a complex join will not be pushed down. If this code needs to 
handle the complex joins , it would need to more sophisticated. 



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 

Re: [PR] [FLINK-33807] Update JUnit to 5.10.1 [flink]

2023-12-13 Thread via GitHub


snuyanzin commented on PR #23917:
URL: https://github.com/apache/flink/pull/23917#issuecomment-1854127737

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33611) Add the ability to reuse variable names across different split method scopes

2023-12-13 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-33611:
--

Assignee: Sai Sharath Dandi

> Add the ability to reuse variable names across different split method scopes
> 
>
> Key: FLINK-33611
> URL: https://issues.apache.org/jira/browse/FLINK-33611
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Assignee: Sai Sharath Dandi
>Priority: Major
>
> h3. Background
> Flink serializes and deserializes protobuf format data by calling the decode 
> or encode method in GeneratedProtoToRow_XXX.java generated by codegen to 
> parse byte[] data into Protobuf Java objects. FLINK-32650 has introduced the 
> ability to split the generated code to improve the performance for large 
> Protobuf schemas. However, this is still not sufficient to support some 
> larger protobuf schemas as the generated code exceeds the java constant pool 
> size [limit|https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool] 
> and we can see errors like "Too many constants" when trying to compile the 
> generated code. 
> *Solution*
> Since we already have the split code functionality already introduced, the 
> main proposal here is to now reuse the variable names across different split 
> method scopes. This will greatly reduce the constant pool size. One more 
> optimization is to only split the last code segment also only when the size 
> exceeds split threshold limit. Currently, the last segment of the generated 
> code is always being split which can lead to too many split methods and thus 
> exceed the constant pool size limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23922:
URL: https://github.com/apache/flink/pull/23922#issuecomment-1854091789

   
   ## CI report:
   
   * 7c02b054dfcf713a87e92344bbc95d0d8bd12393 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-13 Thread via GitHub


liming30 opened a new pull request, #23922:
URL: https://github.com/apache/flink/pull/23922

   
   
   
   ## What is the purpose of the change
   
[FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs 
when the task exited.
   
   
   ## Brief change log
   
   When the task exits, delete all rocksdb log files with the same prefix in 
the relocated directory.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest#testCleanRelocatedDbLogs
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33798) Automatically clean up rocksdb logs when the task failover.

2023-12-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33798:
---
Labels: pull-request-available  (was: )

> Automatically clean up rocksdb logs when the task failover.
> ---
>
> Key: FLINK-33798
> URL: https://issues.apache.org/jira/browse/FLINK-33798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Ming Li
>Priority: Major
>  Labels: pull-request-available
>
> Since FLINK-24785 relocates rocksdb log, multiple rocksdb logs will be 
> created under the flink log directory, but they are not cleaned up during 
> task failover, resulting in a large number of rocksdb logs under the flink 
> log directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2023-12-13 Thread via GitHub


Zakelly commented on PR #23509:
URL: https://github.com/apache/flink/pull/23509#issuecomment-1854062776

   @masteryhx  @Myasuka Thanks for your review! I have addressed your comments, 
would you please take another look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2023-12-13 Thread via GitHub


Zakelly commented on code in PR #23509:
URL: https://github.com/apache/flink/pull/23509#discussion_r1425468192


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -228,6 +228,32 @@ public class CheckpointingOptions {
 + "in a Flink supported filesystem. The 
storage path must be accessible from all participating processes/nodes"
 + "(i.e. all TaskManagers and 
JobManagers).");
 
+/**
+ * Whether to create sub-directories named by job id to store the data 
files and meta data of
+ * checkpoints. The default value is true to enable user could run several 
jobs with the same
+ * checkpoint directory at the same time. If this value is set to false, 
pay attention not to
+ * run several jobs with the same directory simultaneously.
+ */
+@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
+public static final ConfigOption CREATE_CHECKPOINT_SUB_DIS =
+ConfigOptions.key("state.checkpoints.create-subdir")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+Description.builder()
+.text(
+"Whether to create sub-directories 
named by job id under the '%s' to store the data files and meta data "
++ "of checkpoints. The 
default value is true to enable user could run several jobs with the same "
++ "checkpoint directory at 
the same time. If this value is set to false, pay attention not to "
++ "run several jobs with 
the same directory simultaneously. ",
+
TextElement.code(CHECKPOINTS_DIRECTORY.key()))
+.linebreak()
+.text(
+"WARNING: This is an advanced 
configuration. If set to false, users must ensure that no multiple jobs are run 
"

Review Comment:
   I'm afraid there are some misunderstanding here. The text here is trying to 
tell user to keep this directory for only one job use, no matter what the 
recovery mode is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2023-12-13 Thread via GitHub


Zakelly commented on code in PR #23509:
URL: https://github.com/apache/flink/pull/23509#discussion_r1425459440


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java:
##
@@ -72,6 +73,7 @@ public MemoryBackendCheckpointStorageAccess(
 JobID jobId,
 @Nullable Path checkpointsBaseDirectory,
 @Nullable Path defaultSavepointLocation,
+boolean createCheckpointSubDirs,

Review Comment:
   I'd rather keep a single constructor and not providing such constructor, 
since it is more friendly for further developing. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2023-12-13 Thread via GitHub


Zakelly commented on code in PR #23509:
URL: https://github.com/apache/flink/pull/23509#discussion_r1425456803


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java:
##
@@ -141,6 +141,12 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
  */
 private final int writeBufferSize;
 
+/**
+ * Switch to create checkpoint sub-directory with name of jobId. A value 
of 'undefined' means
+ * not yet configured, in which case the default will be used.
+ */
+private TernaryBoolean createCheckpointSubDirs = TernaryBoolean.UNDEFINED;

Review Comment:
   Good point, I removed the TernaryBoolean here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425444399


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   I just tested it and it fails. I need to relook at the fix so that is can 
handle complex joins like the one you mention. 
   Great catch :-) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


AncyRominus opened a new pull request, #732:
URL: https://github.com/apache/flink-kubernetes-operator/pull/732

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request adds a new feature to periodically create 
and maintain savepoints through the `FlinkDeployment` custom resource.)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Periodic savepoint trigger is introduced to the custom resource*
 - *The operator checks on reconciliation whether the required time has 
passed*
 - *The JobManager's dispose savepoint API is used to clean up obsolete 
savepoints*
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425382471


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   @snuyanzin I am reading JdbcFilterPushdownPreparedStatementVisitor and it 
looks like it only attempts to pushdown simple join expressions; in which case 
this code is ok (as that was my assumption). I will test to check my assumption 
is correct that a complex join will not be pushed down. If this code needs to 
handle the complex joins , it would need to much more sophisticated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425382471


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   @snuyanzin I am reading JdbcFilterPushdownPreparedStatementVisitor and it 
looks like it only attempts to pushdown simple join expressions; in which case 
this code is ok (as that was my assumption). I will test to check my assumption 
is correct that a complex join will not be pushed down. If this code needs to 
handle the complex joins , it would need to much more sophisticated - I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33795] Add new config to forbid autoscaling in certain periods of a day [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


flashJd commented on code in PR #728:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1425382922


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Stabilization period in which no new scaling will 
be executed");
 
+public static final ConfigOption> FORBID_PERIOD =
+autoScalerConfig("forbid.periods")
+.stringType()
+.asList()
+.defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods"))
+.withDescription(
+"A (semicolon-separated) list of certain times of 
the day during which autoscaling is forbidden, 
10:00:00-11:00:00;21:30:00-22:30:00 for example");

Review Comment:
   I checked the pr 
https://github.com/apache/flink-kubernetes-operator/pull/656 which introduced 
`class CronExpression` from log4j-core and log4j-core copied from Quartz.
   
   1.  So can we set the `excluded periods` config to be a cron expression 
string and use it's `isSatisfiedBy(final Date date)` method to judge whether in 
excluded periods
   2.  Since standardized cron string doesn't contain a timeZone, should we 
just use the default timeZone or some other way to pass it in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2023-12-13 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1425382471


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +98,70 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
-for (int i = 0; i < keyNames.length; i++) {
+
+for (int i = 0; i < context.getKeys().length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+String param = this.pushdownParams[i].toString();
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ */
+conditions[i] = resolvePredicateParam(resolvedPredicate, 
param);

Review Comment:
   @snuyanzin I am reading JdbcFilterPushdownPreparedStatementVisitor and it 
looks like it only attempts to pushdown simple join expressions; in which case 
this code is ok (as that was my assumption). I will test to confirm my 
understanding is correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33767] Implement restore tests for TemporalJoin node [flink]

2023-12-13 Thread via GitHub


dawidwys commented on code in PR #23916:
URL: https://github.com/apache/flink/pull/23916#discussion_r1425373388


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.TemporalTableFunction;
+
+/** Test step for registering a (temporary) (system or catalog) function. */
+public final class TemporalFunctionTestStep implements TestStep {
+
+/** Whether function should be temporary or not. */
+enum FunctionPersistence {
+TEMPORARY,
+PERSISTENT
+}
+
+/** Whether function should be persisted in a catalog or not. */
+enum FunctionBehavior {
+SYSTEM,
+CATALOG
+}
+
+public final FunctionPersistence persistence;
+public final FunctionBehavior behavior;
+public final String name;
+public final String table;
+public final Expression timeAttribute;
+public final Expression primaryKey;
+
+TemporalFunctionTestStep(
+FunctionPersistence persistence,
+FunctionBehavior behavior,
+String name,
+String table,
+Expression timeAttribute,
+Expression primaryKey) {
+this.persistence = persistence;
+this.behavior = behavior;
+this.name = name;
+this.table = table;
+this.timeAttribute = timeAttribute;
+this.primaryKey = primaryKey;
+}
+
+@Override
+public TestKind getKind() {
+return TestKind.TEMPORAL_FUNCTION;
+}
+
+public void apply(TableEnvironment env) {
+TemporalTableFunction function =
+env.from(table).createTemporalTableFunction(timeAttribute, 
primaryKey);
+if (behavior == FunctionBehavior.SYSTEM) {
+if (persistence == FunctionPersistence.TEMPORARY) {
+env.createTemporarySystemFunction(name, function);
+
+} else {
+throw new UnsupportedOperationException("System functions must 
be temporary.");

Review Comment:
   I wouldn't add options which are not possible. Temporal functions can only 
ever be temporary.  Other than that I'd say it's fine.



##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.TemporalTableFunction;
+
+/** Test step for registering a (temporary) (system or catalog) function. */
+public final class TemporalFunctionTestStep implements TestStep {
+
+/** Whether function should be temporary or not. */
+enum FunctionPersistence {
+TEMPORARY,
+PERSISTENT
+}
+
+/** Whether function should be persisted in a catalog or not. */
+enum FunctionBehavior {
+SYSTEM,
+CATALOG
+}
+
+public final FunctionPersistence persistence;
+public final FunctionBehavior behavior;
+public final String name;
+public final String table;
+

[jira] [Commented] (FLINK-26644) python StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies failed on azure

2023-12-13 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796276#comment-17796276
 ] 

Jim Hughes commented on FLINK-26644:


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55469=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf]
 

> python 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies 
> failed on azure
> ---
>
> Key: FLINK-26644
> URL: https://issues.apache.org/jira/browse/FLINK-26644
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.14.4, 1.15.0, 1.16.0, 1.19.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2022-03-14T18:50:24.6842853Z Mar 14 18:50:24 
> === FAILURES 
> ===
> 2022-03-14T18:50:24.6844089Z Mar 14 18:50:24 _ 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies _
> 2022-03-14T18:50:24.6844846Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6846063Z Mar 14 18:50:24 self = 
>   testMethod=test_generate_stream_graph_with_dependencies>
> 2022-03-14T18:50:24.6847104Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6847766Z Mar 14 18:50:24 def 
> test_generate_stream_graph_with_dependencies(self):
> 2022-03-14T18:50:24.6848677Z Mar 14 18:50:24 python_file_dir = 
> os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6849833Z Mar 14 18:50:24 os.mkdir(python_file_dir)
> 2022-03-14T18:50:24.6850729Z Mar 14 18:50:24 python_file_path = 
> os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
> 2022-03-14T18:50:24.6852679Z Mar 14 18:50:24 with 
> open(python_file_path, 'w') as f:
> 2022-03-14T18:50:24.6853646Z Mar 14 18:50:24 f.write("def 
> add_two(a):\nreturn a + 2")
> 2022-03-14T18:50:24.6854394Z Mar 14 18:50:24 env = self.env
> 2022-03-14T18:50:24.6855019Z Mar 14 18:50:24 
> env.add_python_file(python_file_path)
> 2022-03-14T18:50:24.6855519Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6856254Z Mar 14 18:50:24 def plus_two_map(value):
> 2022-03-14T18:50:24.6857045Z Mar 14 18:50:24 from 
> test_stream_dependency_manage_lib import add_two
> 2022-03-14T18:50:24.6857865Z Mar 14 18:50:24 return value[0], 
> add_two(value[1])
> 2022-03-14T18:50:24.6858466Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6858924Z Mar 14 18:50:24 def add_from_file(i):
> 2022-03-14T18:50:24.6859806Z Mar 14 18:50:24 with 
> open("data/data.txt", 'r') as f:
> 2022-03-14T18:50:24.6860266Z Mar 14 18:50:24 return i[0], 
> i[1] + int(f.read())
> 2022-03-14T18:50:24.6860879Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6862022Z Mar 14 18:50:24 from_collection_source = 
> env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1),
> 2022-03-14T18:50:24.6863259Z Mar 14 18:50:24  
>  ('e', 2)],
> 2022-03-14T18:50:24.6864057Z Mar 14 18:50:24  
> type_info=Types.ROW([Types.STRING(),
> 2022-03-14T18:50:24.6864651Z Mar 14 18:50:24  
>  Types.INT()]))
> 2022-03-14T18:50:24.6865150Z Mar 14 18:50:24 
> from_collection_source.name("From Collection")
> 2022-03-14T18:50:24.6866212Z Mar 14 18:50:24 keyed_stream = 
> from_collection_source.key_by(lambda x: x[1], key_type=Types.INT())
> 2022-03-14T18:50:24.6867083Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6867793Z Mar 14 18:50:24 plus_two_map_stream = 
> keyed_stream.map(plus_two_map).name("Plus Two Map").set_parallelism(3)
> 2022-03-14T18:50:24.6868620Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6869412Z Mar 14 18:50:24 add_from_file_map = 
> plus_two_map_stream.map(add_from_file).name("Add From File Map")
> 2022-03-14T18:50:24.6870239Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6870883Z Mar 14 18:50:24 test_stream_sink = 
> add_from_file_map.add_sink(self.test_sink).name("Test Sink")
> 2022-03-14T18:50:24.6871803Z Mar 14 18:50:24 
> test_stream_sink.set_parallelism(4)
> 2022-03-14T18:50:24.6872291Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6872756Z Mar 14 18:50:24 archive_dir_path = 
> os.path.join(self.tempdir, "archive_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6873557Z Mar 14 18:50:24 
> os.mkdir(archive_dir_path)
> 2022-03-14T18:50:24.6874817Z Mar 14 18:50:24 with 
> open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
> 2022-03-14T18:50:24.6875414Z Mar 14 18:50:24 f.write("3")
> 

[jira] [Updated] (FLINK-33815) Add tests against jdk17 for pulsar connector

2023-12-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-33815:
---
Fix Version/s: pulsar-4.2.0

> Add tests against jdk17 for pulsar connector
> 
>
> Key: FLINK-33815
> URL: https://issues.apache.org/jira/browse/FLINK-33815
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33815) Add tests against jdk17 for pulsar connector

2023-12-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-33815:
---
Affects Version/s: pulsar-4.0.1

> Add tests against jdk17 for pulsar connector
> 
>
> Key: FLINK-33815
> URL: https://issues.apache.org/jira/browse/FLINK-33815
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.1
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33815) Add tests against jdk17 for pulsar connector

2023-12-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-33815.

Resolution: Fixed

Implemented in main(4.2): 8bfbe33f86a45b4757a9cd1f19f2ede5da771487

> Add tests against jdk17 for pulsar connector
> 
>
> Key: FLINK-33815
> URL: https://issues.apache.org/jira/browse/FLINK-33815
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.1
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33815][Connector / Pulsar] Add tests against jdk17 for pulsar connector [flink-connector-pulsar]

2023-12-13 Thread via GitHub


leonardBang merged PR #71:
URL: https://github.com/apache/flink-connector-pulsar/pull/71


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33815][Connector / Pulsar] Add tests against jdk17 for pulsar connector [flink-connector-pulsar]

2023-12-13 Thread via GitHub


snuyanzin commented on PR #71:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/71#issuecomment-1853797497

   @leonardBang @tisonkun could you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33454) Adding tls configuration to IngressSpec

2023-12-13 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796210#comment-17796210
 ] 

dongwoo.kim commented on FLINK-33454:
-

[~ryanvanhuuksloot] sorry for the late reply, thanks for your efforts :)

> Adding tls configuration to IngressSpec
> ---
>
> Key: FLINK-33454
> URL: https://issues.apache.org/jira/browse/FLINK-33454
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: dongwoo.kim
>Assignee: Ryan van Huuksloot
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Hello, I want to propose new configuration parameter for IngressSpec.
> Currently flink k8s operator creates ingress resource as we define but it 
> doesn't support tls configuration to secure ingress. 
> How about adding tls parameter on IngressSpec?
> *IngressSpec*
> tls: IngressTLS
> *IngressTLSSpec*
> Hosts: List
> SecretName: String 
> If we could reach an agreement I'll be glad to take on the implementation.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33641][test] Suppress the DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test failures [flink]

2023-12-13 Thread via GitHub


snuyanzin commented on PR #23914:
URL: https://github.com/apache/flink/pull/23914#issuecomment-1853781175

   I tested with junit5.10.1 and I think it also introduced some changes
   if you look at PR https://github.com/apache/flink/pull/23917 where there is 
only junit bump 5.9.1 -> 5.10.1
   ci failure is also different from what we have in master
   ```
   2023-12-13T01:01:30.0309316Z Dec 13 01:01:29 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
   2023-12-13T01:01:30.0310427Z Dec 13 01:01:29 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   2023-12-13T01:01:30.0312635Z Dec 13 01:01:29 Suppressed: 
java.nio.file.NoSuchFileException: 
/tmp/junit8675027143221640473/b26755eb2623b363024e04d5db7543aa/chk-3
   2023-12-13T01:01:30.0313874Z Dec 13 01:01:29 at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
   2023-12-13T01:01:30.0314987Z Dec 13 01:01:29 at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
   2023-12-13T01:01:30.0316077Z Dec 13 01:01:29 at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
   2023-12-13T01:01:30.0317253Z Dec 13 01:01:29 at 
sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
   2023-12-13T01:01:30.0318483Z Dec 13 01:01:29 at 
sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
   2023-12-13T01:01:30.0319812Z Dec 13 01:01:29 at 
sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
   2023-12-13T01:01:30.0320982Z Dec 13 01:01:29 at 
java.nio.file.Files.readAttributes(Files.java:1737)
   2023-12-13T01:01:30.0322005Z Dec 13 01:01:29 at 
java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
   2023-12-13T01:01:30.0323049Z Dec 13 01:01:29 at 
java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
   2023-12-13T01:01:30.0324067Z Dec 13 01:01:29 at 
java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372)
   2023-12-13T01:01:30.0325253Z Dec 13 01:01:29 at 
java.nio.file.Files.walkFileTree(Files.java:2706)
   2023-12-13T01:01:30.0326179Z Dec 13 01:01:29 at 
java.nio.file.Files.walkFileTree(Files.java:2742)
   2023-12-13T01:01:30.0326926Z Dec 13 01:01:29 ... 40 more
   2023-12-13T01:01:30.0328421Z Dec 13 01:01:29 Suppressed: 
java.nio.file.NoSuchFileException: 
/tmp/junit8675027143221640473/b26755eb2623b363024e04d5db7543aa/chk-3
   2023-12-13T01:01:30.0329745Z Dec 13 01:01:29 at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
   2023-12-13T01:01:30.0330947Z Dec 13 01:01:29 at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
   2023-12-13T01:01:30.0332078Z Dec 13 01:01:29 at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
   2023-12-13T01:01:30.0333192Z Dec 13 01:01:29 at 
sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
   2023-12-13T01:01:30.0334389Z Dec 13 01:01:29 at 
sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
   2023-12-13T01:01:30.0335445Z Dec 13 01:01:29 at 
java.nio.file.Files.delete(Files.java:1126)
   2023-12-13T01:01:30.0336348Z Dec 13 01:01:29 at 
java.nio.file.Files.walkFileTree(Files.java:2672)
   2023-12-13T01:01:30.0337105Z Dec 13 01:01:29 ... 41 
more
   ```
   
   May be they fixed something between these versions.
   My statement about checkpoint failure is for cases with JUnit 5.10.1 (which 
is not in master yet) since with current master I can not reproduce the problem 
locally
   
   I would propose to go with newer JUnit to avoid refixing this againafter 
update


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425246326


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+.isEmpty();

Review Comment:
   That’s not how this works unfortunately, if any of the metrics is not found 
an empty response comes back from Flink 



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+.isEmpty();

Review Comment:
   That’s not how this works unfortunately, if any of the metrics is not found 
an empty response comes back from Flink 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >