Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2016325794

   Thank you @snuyanzin  for the review.
   I made some change based on your comments~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1536549280


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/**
+ * The JDBC source reader to read data from jdbc splits.
+ *
+ * @param  The type of the record read from the source.
+ */
+public class JdbcSourceSplitReader
+implements SplitReader, JdbcSourceSplit>, 
ResultTypeQueryable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitReader.class);
+
+private final Configuration config;
+@Nullable private JdbcSourceSplit currentSplit;
+private final Queue splits;
+private final TypeInformation typeInformation;
+private final JdbcConnectionProvider connectionProvider;
+private transient Connection connection;
+private transient PreparedStatement statement;
+private transient ResultSet resultSet;
+
+private final ResultExtractor resultExtractor;
+protected boolean hasNextRecordCurrentSplit;
+private final DeliveryGuarantee deliveryGuarantee;
+
+private final int splitReaderFetchBatchSize;
+
+private final int resultSetType;
+private final int resultSetConcurrency;
+private final int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private final Boolean autoCommit;
+private int currentSplitOffset;
+
+private final SourceReaderContext context;
+
+public JdbcSourceSplitReader(
+SourceReaderContext context,
+Configuration config,
+TypeInformation typeInformation,
+JdbcConnectionProvider connectionProvider,
+DeliveryGuarantee deliveryGuarantee,
+ResultExtractor resultExtractor) {
+this.context = Preconditions.checkNotNull(context);
+this.config = Preconditions.checkNotNull(config);
+this.typeInformation = Preconditions.checkNotNull(typeInformation);
+this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
+this.resultSetType = config.getInteger(RESULTSET_TYPE);
+

[jira] [Updated] (FLINK-33461) Support streaming related semantics for the new jdbc source

2024-03-22 Thread RocMarshal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RocMarshal updated FLINK-33461:
---
Summary: Support streaming related semantics for the new jdbc source  (was: 
Support stream related semantics for the new jdbc source)

> Support streaming related semantics for the new jdbc source
> ---
>
> Key: FLINK-33461
> URL: https://issues.apache.org/jira/browse/FLINK-33461
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: RocMarshal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34923) Behavioral discrepancy between `TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`

2024-03-22 Thread Chloe He (Jira)
Chloe He created FLINK-34923:


 Summary: Behavioral discrepancy between 
`TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`
 Key: FLINK-34923
 URL: https://issues.apache.org/jira/browse/FLINK-34923
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.0
Reporter: Chloe He


I found that there is some behavioral discrepancy between 
`TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`.

A minimal reproducible example:
{code:java}
SELECT `value` FROM (VALUES (CAST(ARRAY[ROW(1, 2), ROW(2, 2)] AS ARRAY>))) AS `t`(`value`) {code}
This throws
{code:java}
File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/table.py:943,
 in Table.to_pandas(self)
939 import pytz
940 timezone = pytz.timezone(
941 
self._j_table.getTableEnvironment().getConfig().getLocalTimeZone().getId())
942 serializer = ArrowSerializer(
--> 943 create_arrow_schema(self.get_schema().get_field_names(),
944 self.get_schema().get_field_data_types()),
945 self.get_schema().to_row_data_type(),
946 timezone)
947 import pyarrow as pa
948 table = 
pa.Table.from_batches(serializer.load_from_iterator(batches_iterator))

File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/types.py:2194,
 in create_arrow_schema(field_names, field_types)
   2190 """
   2191 Create an Arrow schema with the specified filed names and types.
   2192 """
   2193 import pyarrow as pa
-> 2194 fields = [pa.field(field_name, to_arrow_type(field_type), 
field_type._nullable)
   2195   for field_name, field_type in zip(field_names, field_types)]
   2196 return pa.schema(fields)

File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/types.py:2194,
 in (.0)
   2190 """
   2191 Create an Arrow schema with the specified filed names and types.
   2192 """
   2193 import pyarrow as pa
-> 2194 fields = [pa.field(field_name, to_arrow_type(field_type), 
field_type._nullable)
   2195   for field_name, field_type in zip(field_names, field_types)]
   2196 return pa.schema(fields)

File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/types.py:2316,
 in to_arrow_type(data_type)
   2314 elif isinstance(data_type, ArrayType):
   2315 if type(data_type.element_type) in [LocalZonedTimestampType, 
RowType]:
-> 2316 raise ValueError("%s is not supported to be used as the element 
type of ArrayType." %
   2317  data_type.element_type)
   2318 return pa.list_(to_arrow_type(data_type.element_type))
   2319 elif isinstance(data_type, RowType):

ValueError: ROW is not supported to be used as the element type of ArrayType. 
{code}
when I tried to execute it with `TableEnvironment.sql_query()`, but works when 
I tried it with `TableEnvironment.execute_sql()`:
{code:java}
+++
| op |  value |
+++
| +I |   [(1, 2), (2, 2)] |
+++ {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-22 Thread via GitHub


jnh5y commented on PR #23886:
URL: https://github.com/apache/flink/pull/23886#issuecomment-2016043996

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-22 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1536282860


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateEventTimeRestoreTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecWindowAggregate}. */
+public class WindowAggregateEventTimeRestoreTest extends RestoreTestBase {
+
+public WindowAggregateEventTimeRestoreTest() {
+super(StreamExecWindowAggregate.class);
+}
+
+@Override
+public List programs() {
+return Arrays.asList(
+WindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_EVENT_TIME,

Review Comment:
   Ah, this was a mistake on my part.  Corrected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-22 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1536282061


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java:
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowAggregate}. */
+public class WindowAggregateTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, 
"a"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, 
null),
+Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), 
"Comment#3", "b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d"),
+Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), 
"Comment#6", "c"),
+Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), 
"Comment#7", "d")
+};
+
+static final SourceTestStep SOURCE =
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"a_int INT",
+"b_double DOUBLE",
+"c_float FLOAT",
+"d_bigdec DECIMAL(10, 2)",
+"`comment` STRING",
+"name STRING",
+"`rowtime` AS TO_TIMESTAMP(`ts`)",
+"`proctime` AS PROCTIME()",
+"WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND")
+.producedBeforeRestore(BEFORE_DATA)
+.producedAfterRestore(AFTER_DATA)
+.build();
+
+static final String[] TUMBLE_EVENT_TIME_BEFORE_ROWS = {
+"+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]",
+"+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]",
+"+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]",
+"+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]"
+};
+
+public static final String[] TUMBLE_EVENT_TIME_AFTER_ROWS = {
+"+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 1, 1]",
+"+I[null, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 7, 0]",
+"+I[a, 2020-10-10T00:00:40, 2020-10-10T00:00:45, 1, 10, 1]",
+"+I[c, 2020-10-10T00:00:40, 2020-10-10T00:00:45, 1, 12, 1]",
+"+I[d, 2020-10-10T00:00:40, 2020-10-10T00:00:45, 2, 24, 2]"
+};
+
+static final TableTestProgram GROUP_TUMBLE_WINDOW_EVENT_TIME =
+getTableTestProgram(
+"window-aggregate-tumble-event-time",
+"validates group by using 

Re: [PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-03-22 Thread via GitHub


liyubin117 commented on PR #24555:
URL: https://github.com/apache/flink/pull/24555#issuecomment-2015888093

   @wuchong Hi, CI passed, Could you please take a review? thanks very much :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34922) Exception History should support multiple Global failures

2024-03-22 Thread Panagiotis Garefalakis (Jira)
Panagiotis Garefalakis created FLINK-34922:
--

 Summary: Exception History should support multiple Global failures
 Key: FLINK-34922
 URL: https://issues.apache.org/jira/browse/FLINK-34922
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Panagiotis Garefalakis


Before source coordinators were introduced, global failures were rare and only 
triggered by the JM ensuring they only happened once per failure. Since this 
has changed now we should adjust accordingly and support multiple global 
failures as part of the exception history.

Relevant discussion under: 
https://github.com/apache/flink/pull/23440#pullrequestreview-1701775436



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34643) JobIDLoggingITCase failed

2024-03-22 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan closed FLINK-34643.
-
Resolution: Fixed

> JobIDLoggingITCase failed
> -
>
> Key: FLINK-34643
> URL: https://issues.apache.org/jira/browse/FLINK-34643
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=7897
> {code}
> Mar 09 01:24:23 01:24:23.498 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 4.209 s <<< FAILURE! -- in 
> org.apache.flink.test.misc.JobIDLoggingITCase
> Mar 09 01:24:23 01:24:23.498 [ERROR] 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(ClusterClient) 
> -- Time elapsed: 1.459 s <<< ERROR!
> Mar 09 01:24:23 java.lang.IllegalStateException: Too few log events recorded 
> for org.apache.flink.runtime.jobmaster.JobMaster (12) - this must be a bug in 
> the test code
> Mar 09 01:24:23   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:148)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:132)
> Mar 09 01:24:23   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 09 01:24:23   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 09 01:24:23 
> {code}
> The other test failures of this build were also caused by the same test:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=8349
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3=712ade8c-ca16-5b76-3acd-14df33bc1cb1=8209



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34643) JobIDLoggingITCase failed

2024-03-22 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829969#comment-17829969
 ] 

Roman Khachatryan commented on FLINK-34643:
---

Merged into master as 
ed4d6f091f27ffc778cbb6de6a3fa19251277bdc..4edafcc8b0b96920036a1afaaa37ae87b77668ed.

> JobIDLoggingITCase failed
> -
>
> Key: FLINK-34643
> URL: https://issues.apache.org/jira/browse/FLINK-34643
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=7897
> {code}
> Mar 09 01:24:23 01:24:23.498 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 4.209 s <<< FAILURE! -- in 
> org.apache.flink.test.misc.JobIDLoggingITCase
> Mar 09 01:24:23 01:24:23.498 [ERROR] 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(ClusterClient) 
> -- Time elapsed: 1.459 s <<< ERROR!
> Mar 09 01:24:23 java.lang.IllegalStateException: Too few log events recorded 
> for org.apache.flink.runtime.jobmaster.JobMaster (12) - this must be a bug in 
> the test code
> Mar 09 01:24:23   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:148)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:132)
> Mar 09 01:24:23   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 09 01:24:23   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 09 01:24:23 
> {code}
> The other test failures of this build were also caused by the same test:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=8349
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3=712ade8c-ca16-5b76-3acd-14df33bc1cb1=8209



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34643] Fix concurrency issue in LoggerAuditingExtension [flink]

2024-03-22 Thread via GitHub


rkhachatryan merged PR #24550:
URL: https://github.com/apache/flink/pull/24550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34643] Fix concurrency issue in LoggerAuditingExtension [flink]

2024-03-22 Thread via GitHub


rkhachatryan commented on PR #24550:
URL: https://github.com/apache/flink/pull/24550#issuecomment-2015730627

   Thanks for reviewing! Merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34920) ZooKeeperLeaderRetrievalConnectionHandlingTest fails with Exit Code 2

2024-03-22 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba closed FLINK-34920.
---
Resolution: Duplicate

> ZooKeeperLeaderRetrievalConnectionHandlingTest fails with Exit Code 2
> -
>
> Key: FLINK-34920
> URL: https://issues.apache.org/jira/browse/FLINK-34920
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> [https://github.com/apache/flink/actions/runs/8384423618/job/22961979482#step:10:8939]
> {code:java}
> [ERROR] Process Exit Code: 2
> [ERROR] Crashed tests:
> [ERROR] 
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalConnectionHandlingTest
> [ERROR]   at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456)
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33816][streaming] Fix unstable test SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain [flink]

2024-03-22 Thread via GitHub


flinkbot commented on PR #24556:
URL: https://github.com/apache/flink/pull/24556#issuecomment-2015394415

   
   ## CI report:
   
   * 0f3dc72c9cb266ec4bb3119f67eb39fd64418700 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed

2024-03-22 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829918#comment-17829918
 ] 

Matthias Pohl edited comment on FLINK-33816 at 3/22/24 3:51 PM:


I created the [1.19 backport|https://github.com/apache/flink/pull/24556]. Is 
this also affecting 1.18? Based on the git history I would assume so.


was (Author: mapohl):
I created the 1.19 backport. Is this also affecting 1.18? Based on the git 
history I would assume so.

> SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
> async checkpoint triggering not being completed 
> -
>
> Key: FLINK-33816
> URL: https://issues.apache.org/jira/browse/FLINK-33816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: jiabao.sun
>Priority: Major
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.20.0
>
> Attachments: screenshot-1.png
>
>
> [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430]
> {code:java}
> rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest
> 9426Error: 14:39:01 14:39:01.930 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
>   Time elapsed: 0.034 s  <<< FAILURE!
> 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: 
> 9428Dec 12 14:39:01 
> 9429Dec 12 14:39:01 Expecting value to be true but was false
> 9430Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
> 9431Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
> 9432Dec 12 14:39:01   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710)
> 9433Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> 9434Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed

2024-03-22 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829918#comment-17829918
 ] 

Matthias Pohl commented on FLINK-33816:
---

I created the 1.19 backport. Is this also affecting 1.18? Based on the git 
history I would assume so.

> SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
> async checkpoint triggering not being completed 
> -
>
> Key: FLINK-33816
> URL: https://issues.apache.org/jira/browse/FLINK-33816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: jiabao.sun
>Priority: Major
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.20.0
>
> Attachments: screenshot-1.png
>
>
> [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430]
> {code:java}
> rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest
> 9426Error: 14:39:01 14:39:01.930 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
>   Time elapsed: 0.034 s  <<< FAILURE!
> 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: 
> 9428Dec 12 14:39:01 
> 9429Dec 12 14:39:01 Expecting value to be true but was false
> 9430Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
> 9431Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
> 9432Dec 12 14:39:01   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710)
> 9433Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> 9434Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33816][streaming] Fix unstable test SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain [flink]

2024-03-22 Thread via GitHub


XComp opened a new pull request, #24556:
URL: https://github.com/apache/flink/pull/24556

   1.19 backport for PR #24016


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed

2024-03-22 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-33816:
--
Fix Version/s: 1.20.0
   (was: 2.0.0)

> SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
> async checkpoint triggering not being completed 
> -
>
> Key: FLINK-33816
> URL: https://issues.apache.org/jira/browse/FLINK-33816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: jiabao.sun
>Priority: Major
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.20.0
>
> Attachments: screenshot-1.png
>
>
> [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430]
> {code:java}
> rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest
> 9426Error: 14:39:01 14:39:01.930 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
>   Time elapsed: 0.034 s  <<< FAILURE!
> 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: 
> 9428Dec 12 14:39:01 
> 9429Dec 12 14:39:01 Expecting value to be true but was false
> 9430Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
> 9431Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
> 9432Dec 12 14:39:01   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710)
> 9433Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> 9434Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34643) JobIDLoggingITCase failed

2024-03-22 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829876#comment-17829876
 ] 

Matthias Pohl edited comment on FLINK-34643 at 3/22/24 3:44 PM:


* 
[https://github.com/apache/flink/actions/runs/8375475096/job/22933386950#step:10:7849]
 * 
[https://github.com/apache/flink/actions/runs/8384698540/job/22962603273#step:10:8296]
 * 
https://github.com/apache/flink/actions/runs/8384423503/job/22961956846#step:10:7958


was (Author: ryanskraba):
* 
[https://github.com/apache/flink/actions/runs/8375475096/job/22933386950#step:10:7849]
 * 
[https://github.com/apache/flink/actions/runs/8384698540/job/22962603273#step:10:8296]
 * 
https://github.com/apache/flink/actions/runs/8375475096/job/22933386950#step:10:7849

> JobIDLoggingITCase failed
> -
>
> Key: FLINK-34643
> URL: https://issues.apache.org/jira/browse/FLINK-34643
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=7897
> {code}
> Mar 09 01:24:23 01:24:23.498 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 4.209 s <<< FAILURE! -- in 
> org.apache.flink.test.misc.JobIDLoggingITCase
> Mar 09 01:24:23 01:24:23.498 [ERROR] 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(ClusterClient) 
> -- Time elapsed: 1.459 s <<< ERROR!
> Mar 09 01:24:23 java.lang.IllegalStateException: Too few log events recorded 
> for org.apache.flink.runtime.jobmaster.JobMaster (12) - this must be a bug in 
> the test code
> Mar 09 01:24:23   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:148)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:132)
> Mar 09 01:24:23   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 09 01:24:23   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 09 01:24:23 
> {code}
> The other test failures of this build were also caused by the same test:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=8349
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3=712ade8c-ca16-5b76-3acd-14df33bc1cb1=8209



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-03-22 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-18476:
--
Affects Version/s: 1.20.0

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34919) WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST server

2024-03-22 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34919:
--
Component/s: Runtime / Coordination

> WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST 
> server
> 
>
> Key: FLINK-34919
> URL: https://issues.apache.org/jira/browse/FLINK-34919
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58482=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8641]
> {code:java}
> Mar 22 04:12:50 04:12:50.260 [INFO] Running 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
> Mar 22 04:12:50 04:12:50.609 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 0.318 s <<< FAILURE! -- in 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
> Mar 22 04:12:50 04:12:50.609 [ERROR] 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs
>  -- Time elapsed: 0.303 s <<< ERROR!
> Mar 22 04:12:50 java.net.BindException: Could not start rest endpoint on any 
> port in port range 8081
> Mar 22 04:12:50   at 
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:286)
> Mar 22 04:12:50   at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs(WebMonitorEndpointTest.java:69)
> Mar 22 04:12:50   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 22 04:12:50   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 22 04:12:50  {code}
> This was noted as a symptom of FLINK-22980, but doesn't have the same failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34919) WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST server

2024-03-22 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34919:
--
Affects Version/s: 1.19.0

> WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST 
> server
> 
>
> Key: FLINK-34919
> URL: https://issues.apache.org/jira/browse/FLINK-34919
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58482=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8641]
> {code:java}
> Mar 22 04:12:50 04:12:50.260 [INFO] Running 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
> Mar 22 04:12:50 04:12:50.609 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 0.318 s <<< FAILURE! -- in 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
> Mar 22 04:12:50 04:12:50.609 [ERROR] 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs
>  -- Time elapsed: 0.303 s <<< ERROR!
> Mar 22 04:12:50 java.net.BindException: Could not start rest endpoint on any 
> port in port range 8081
> Mar 22 04:12:50   at 
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:286)
> Mar 22 04:12:50   at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs(WebMonitorEndpointTest.java:69)
> Mar 22 04:12:50   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 22 04:12:50   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 22 04:12:50  {code}
> This was noted as a symptom of FLINK-22980, but doesn't have the same failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34526][runtime] Actively disconnect the TM in RM to reduce restart time [flink]

2024-03-22 Thread via GitHub


qinf commented on code in PR #24539:
URL: https://github.com/apache/flink/pull/24539#discussion_r1535713733


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##
@@ -1179,6 +1179,14 @@ protected Optional 
closeTaskManagerConnection(
 
slotManager.unregisterTaskManager(workerRegistration.getInstanceID(), cause);
 clusterPartitionTracker.processTaskExecutorShutdown(resourceID);
 
+jobManagerRegistrations
+.values()
+.forEach(
+jobManagerRegistration ->
+jobManagerRegistration
+.getJobManagerGateway()
+.disconnectTaskManager(resourceID, 
cause));

Review Comment:
   @1996fanrui I have added a unit test.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34409][ci] Enables (most) tests that were disabled for the AdaptiveScheduler due to missing features [flink]

2024-03-22 Thread via GitHub


XComp commented on PR #24285:
URL: https://github.com/apache/flink/pull/24285#issuecomment-2015251788

   I created FLINK-34921 to cover the one test failure which should be 
unrelated to this PR's change. The second test failure is related to FLINK-34643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34921) SystemProcessingTimeServiceTest fails due to missing output

2024-03-22 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34921:
-

 Summary: SystemProcessingTimeServiceTest fails due to missing 
output
 Key: FLINK-34921
 URL: https://issues.apache.org/jira/browse/FLINK-34921
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.20.0
Reporter: Matthias Pohl


This PR CI build with {{AdaptiveScheduler}} enabled failed:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58476=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=11224

{code}
"ForkJoinPool-61-worker-25" #863 daemon prio=5 os_prio=0 tid=0x7f8c19eba000 
nid=0x60a5 waiting on condition [0x7f8bc2cf9000]
Mar 21 17:19:42java.lang.Thread.State: WAITING (parking)
Mar 21 17:19:42 at sun.misc.Unsafe.park(Native Method)
Mar 21 17:19:42 - parking to wait for  <0xd81959b8> (a 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask)
Mar 21 17:19:42 at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
Mar 21 17:19:42 at 
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
Mar 21 17:19:42 at 
java.util.concurrent.FutureTask.get(FutureTask.java:191)
Mar 21 17:19:42 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest$$Lambda$1443/1477662666.call(Unknown
 Source)
Mar 21 17:19:42 at 
org.assertj.core.api.ThrowableAssert.catchThrowable(ThrowableAssert.java:63)
Mar 21 17:19:42 at 
org.assertj.core.api.AssertionsForClassTypes.catchThrowable(AssertionsForClassTypes.java:892)
Mar 21 17:19:42 at 
org.assertj.core.api.Assertions.catchThrowable(Assertions.java:1366)
Mar 21 17:19:42 at 
org.assertj.core.api.Assertions.assertThatThrownBy(Assertions.java:1210)
Mar 21 17:19:42 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture(SystemProcessingTimeServiceTest.java:92)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [release] Adjust website for Kubernetes operator 1.8.0 release [flink-web]

2024-03-22 Thread via GitHub


mxm merged PR #726:
URL: https://github.com/apache/flink-web/pull/726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [release] Adjust website for Kubernetes operator 1.8.0 release [flink-web]

2024-03-22 Thread via GitHub


mxm commented on PR #726:
URL: https://github.com/apache/flink-web/pull/726#issuecomment-2015226003

   @1996fanrui I'm going to merge. Please feel free to comment on the PR. We 
can still correct any mistakes or further improve the post!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34902][table] Fix column mismatch IndexOutOfBoundsException [flink]

2024-03-22 Thread via GitHub


jeyhunkarimov commented on PR #24552:
URL: https://github.com/apache/flink/pull/24552#issuecomment-2015187722

   Thanks a lot for the review @twalthr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-03-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-32513.
---
Fix Version/s: 1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

master: 8dcb0ae9063b66af1d674b7b0b3be76b6d752692
release-1.19: 5ec4bf2f18168001b5cbb9012f331d3405228516
release-1.18: 940b3bbda5b10abe3a41d60467d33fd424c7dae6

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

Re: [PR] [FLINK-34505][table] Migrate WindowGroupReorderRule to java. [flink]

2024-03-22 Thread via GitHub


snuyanzin commented on code in PR #24375:
URL: https://github.com/apache/flink/pull/24375#discussion_r1535579353


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Planner rule that makes the over window groups which have the same shuffle 
keys and order keys
+ * together.
+ */
+@Value.Enclosing
+public class WindowGroupReorderRule
+extends RelRule {
+
+public static final WindowGroupReorderRule INSTANCE =
+
WindowGroupReorderRule.WindowGroupReorderRuleConfig.DEFAULT.toRule();
+
+private WindowGroupReorderRule(WindowGroupReorderRuleConfig config) {
+super(config);
+}
+
+@Override
+public boolean matches(RelOptRuleCall call) {
+LogicalWindow window = call.rel(0);
+return window.groups.size() > 1;
+}
+
+@Override
+public void onMatch(RelOptRuleCall call) {
+LogicalWindow window = call.rel(0);
+RelNode input = call.rel(1);
+List oldGroups = new ArrayList<>(window.groups);
+List sequenceGroups = new ArrayList<>(window.groups);
+
+sequenceGroups.sort(
+(o1, o2) -> {
+int keyComp = o1.keys.compareTo(o2.keys);
+if (keyComp == 0) {
+return compareRelCollation(o1.orderKeys, o2.orderKeys);
+} else {
+return keyComp;
+}
+});
+
+if (!sequenceGroups.equals(oldGroups) && 
!Lists.reverse(sequenceGroups).equals(oldGroups)) {
+int offset = input.getRowType().getFieldCount();
+List aggTypeIndexes = new ArrayList<>();
+for (Group group : oldGroups) {
+int aggCount = group.aggCalls.size();
+int[] typeIndexes = new int[aggCount];
+for (int i = 0; i < aggCount; i++) {
+typeIndexes[i] = offset + i;
+}
+offset += aggCount;
+aggTypeIndexes.add(typeIndexes);
+}
+
+offset = input.getRowType().getFieldCount();
+List mapToOldTypeIndexes =
+IntStream.range(0, 
offset).boxed().collect(Collectors.toList());
+for (Group newGroup : sequenceGroups) {
+int aggCount = newGroup.aggCalls.size();
+int oldIndex = oldGroups.indexOf(newGroup);
+offset += aggCount;
+for (int aggIndex = 0; aggIndex < aggCount; aggIndex++) {
+
mapToOldTypeIndexes.add(aggTypeIndexes.get(oldIndex)[aggIndex]);
+}
+}
+
+List> newFieldList =
+mapToOldTypeIndexes.stream()
+.map(index -> 
window.getRowType().getFieldList().get(index))
+.collect(Collectors.toList());
+RelDataType intermediateRowType =
+
window.getCluster().getTypeFactory().createStructType(newFieldList);
+LogicalWindow newLogicalWindow =
+LogicalWindow.create(
+window.getCluster().getPlanner().emptyTraitSet(),
+input,
+

Re: [PR] [FLINK-34505][table] Migrate WindowGroupReorderRule to java. [flink]

2024-03-22 Thread via GitHub


snuyanzin commented on code in PR #24375:
URL: https://github.com/apache/flink/pull/24375#discussion_r1535578497


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Planner rule that makes the over window groups which have the same shuffle 
keys and order keys
+ * together.
+ */
+@Value.Enclosing
+public class WindowGroupReorderRule
+extends RelRule {
+
+public static final WindowGroupReorderRule INSTANCE =
+
WindowGroupReorderRule.WindowGroupReorderRuleConfig.DEFAULT.toRule();
+
+private WindowGroupReorderRule(WindowGroupReorderRuleConfig config) {
+super(config);
+}
+
+@Override
+public boolean matches(RelOptRuleCall call) {
+LogicalWindow window = call.rel(0);
+return window.groups.size() > 1;
+}
+
+@Override
+public void onMatch(RelOptRuleCall call) {
+LogicalWindow window = call.rel(0);
+RelNode input = call.rel(1);
+List oldGroups = new ArrayList<>(window.groups);
+List sequenceGroups = new ArrayList<>(window.groups);
+
+sequenceGroups.sort(
+(o1, o2) -> {
+int keyComp = o1.keys.compareTo(o2.keys);
+if (keyComp == 0) {
+return compareRelCollation(o1.orderKeys, o2.orderKeys);
+} else {
+return keyComp;
+}
+});
+
+if (!sequenceGroups.equals(oldGroups) && 
!Lists.reverse(sequenceGroups).equals(oldGroups)) {
+int offset = input.getRowType().getFieldCount();
+List aggTypeIndexes = new ArrayList<>();
+for (Group group : oldGroups) {
+int aggCount = group.aggCalls.size();
+int[] typeIndexes = new int[aggCount];
+for (int i = 0; i < aggCount; i++) {
+typeIndexes[i] = offset + i;
+}
+offset += aggCount;
+aggTypeIndexes.add(typeIndexes);
+}
+
+offset = input.getRowType().getFieldCount();
+List mapToOldTypeIndexes =
+IntStream.range(0, 
offset).boxed().collect(Collectors.toList());
+for (Group newGroup : sequenceGroups) {
+int aggCount = newGroup.aggCalls.size();
+int oldIndex = oldGroups.indexOf(newGroup);
+offset += aggCount;
+for (int aggIndex = 0; aggIndex < aggCount; aggIndex++) {
+
mapToOldTypeIndexes.add(aggTypeIndexes.get(oldIndex)[aggIndex]);
+}
+}
+
+List> newFieldList =
+mapToOldTypeIndexes.stream()
+.map(index -> 
window.getRowType().getFieldList().get(index))
+.collect(Collectors.toList());
+RelDataType intermediateRowType =
+
window.getCluster().getTypeFactory().createStructType(newFieldList);
+LogicalWindow newLogicalWindow =
+LogicalWindow.create(
+window.getCluster().getPlanner().emptyTraitSet(),
+input,
+

[jira] [Commented] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed

2024-03-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829877#comment-17829877
 ] 

Ryan Skraba commented on FLINK-33816:
-

(1.19) 
[https://github.com/apache/flink/actions/runs/8384423618/job/22962033523#step:10:9703]

We might want to backport this to 1.19!  Also: the fix version here might be 
1.20.

> SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
> async checkpoint triggering not being completed 
> -
>
> Key: FLINK-33816
> URL: https://issues.apache.org/jira/browse/FLINK-33816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: jiabao.sun
>Priority: Major
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 2.0.0
>
> Attachments: screenshot-1.png
>
>
> [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430]
> {code:java}
> rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest
> 9426Error: 14:39:01 14:39:01.930 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
>   Time elapsed: 0.034 s  <<< FAILURE!
> 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: 
> 9428Dec 12 14:39:01 
> 9429Dec 12 14:39:01 Expecting value to be true but was false
> 9430Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
> 9431Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
> 9432Dec 12 14:39:01   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710)
> 9433Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> 9434Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34643) JobIDLoggingITCase failed

2024-03-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829876#comment-17829876
 ] 

Ryan Skraba commented on FLINK-34643:
-

* 
[https://github.com/apache/flink/actions/runs/8375475096/job/22933386950#step:10:7849]
 * 
[https://github.com/apache/flink/actions/runs/8384698540/job/22962603273#step:10:8296]
 * 
https://github.com/apache/flink/actions/runs/8375475096/job/22933386950#step:10:7849

> JobIDLoggingITCase failed
> -
>
> Key: FLINK-34643
> URL: https://issues.apache.org/jira/browse/FLINK-34643
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=7897
> {code}
> Mar 09 01:24:23 01:24:23.498 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 4.209 s <<< FAILURE! -- in 
> org.apache.flink.test.misc.JobIDLoggingITCase
> Mar 09 01:24:23 01:24:23.498 [ERROR] 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(ClusterClient) 
> -- Time elapsed: 1.459 s <<< ERROR!
> Mar 09 01:24:23 java.lang.IllegalStateException: Too few log events recorded 
> for org.apache.flink.runtime.jobmaster.JobMaster (12) - this must be a bug in 
> the test code
> Mar 09 01:24:23   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:148)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:132)
> Mar 09 01:24:23   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 09 01:24:23   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 09 01:24:23 
> {code}
> The other test failures of this build were also caused by the same test:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=8349
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3=712ade8c-ca16-5b76-3acd-14df33bc1cb1=8209



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34920) ZooKeeperLeaderRetrievalConnectionHandlingTest fails with Exit Code 2

2024-03-22 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba updated FLINK-34920:

Summary: ZooKeeperLeaderRetrievalConnectionHandlingTest fails with Exit 
Code 2  (was: ZooKeeperLeaderRetrievalConnectionHandlingTest )

> ZooKeeperLeaderRetrievalConnectionHandlingTest fails with Exit Code 2
> -
>
> Key: FLINK-34920
> URL: https://issues.apache.org/jira/browse/FLINK-34920
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> [https://github.com/apache/flink/actions/runs/8384423618/job/22961979482#step:10:8939]
> {code:java}
> [ERROR] Process Exit Code: 2
> [ERROR] Crashed tests:
> [ERROR] 
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalConnectionHandlingTest
> [ERROR]   at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456)
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34920) ZooKeeperLeaderRetrievalConnectionHandlingTest

2024-03-22 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-34920:
---

 Summary: ZooKeeperLeaderRetrievalConnectionHandlingTest 
 Key: FLINK-34920
 URL: https://issues.apache.org/jira/browse/FLINK-34920
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.1
Reporter: Ryan Skraba


[https://github.com/apache/flink/actions/runs/8384423618/job/22961979482#step:10:8939]
{code:java}
[ERROR] Process Exit Code: 2
[ERROR] Crashed tests:
[ERROR] 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalConnectionHandlingTest
[ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456)
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34707) Update japicmp configuration

2024-03-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-34707.
---
Resolution: Fixed

1.19: 6eeae5fe6c9c48ee1e7546f26decbac429f248e1
master: a8e0936f1c40893907e79144233f65d7cd682184

> Update japicmp configuration
> 
>
> Key: FLINK-34707
> URL: https://issues.apache.org/jira/browse/FLINK-34707
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lincoln Lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..$ git add *$ git commit -m "Update japicmp configuration for 
> $RELEASE_VERSION" {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34707) Update japicmp configuration

2024-03-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-34707:

Reporter: Lincoln Lee  (was: Sergey Nuyanzin)

> Update japicmp configuration
> 
>
> Key: FLINK-34707
> URL: https://issues.apache.org/jira/browse/FLINK-34707
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lincoln Lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..$ git add *$ git commit -m "Update japicmp configuration for 
> $RELEASE_VERSION" {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34706) Promote release 1.19

2024-03-22 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829244#comment-17829244
 ] 

lincoln lee edited comment on FLINK-34706 at 3/22/24 1:05 PM:
--

# (/) Website pull request to [list the 
release|http://flink.apache.org/downloads.html] merged
 # (/) Release announced on the user@ mailing list: [[announcement 
link|https://lists.apache.org/thread/sofmxytbh6y20nwot1gywqqc2lqxn4hm]]
 # (/) Blog post published, if applicable:[ blog 
post|https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/]
 # (/) Release recorded in [reporter.apache.org: 
https://reporter.apache.org/addrelease.html?flink|https://reporter.apache.org/addrelease.html?flink]
 # (/) Release announced on social media: 
[Twitter|https://twitter.com/ApacheFlink/status/1638839542403981312?ref_src=twsrc%5Etfw%7Ctwcamp%5Etweetembed%7Ctwterm%5E1638839542403981312%7Ctwgr%5E7f3046f67668cf3ebbd929ef126a32473db2a1b5%7Ctwcon%5Es1_c10_url=https%3A%2F%2Fpublish.twitter.com%2F%3Fquery%3Dhttps3A2F2Ftwitter.com2FApacheFlink2Fstatus2F1638839542403981312widget%3DTweet]
 # (/) Completion declared on the dev@ [mailing list 
|https://lists.apache.org/thread/z8sfwlppsodcyng62c584n76b69b16fc]
 # (/) Update Homebrew: 
[https://docs.brew.sh/How-To-Open-a-Homebrew-Pull-Request] (seems to be done 
automatically - at least for minor releases  for both minor and major 
releases): [https://formulae.brew.sh/formula/apache-flink#default]
 # (/) No need to update quickstart scripts in {{{}flink-web{}}}, under the 
{{q/}} directory (alread use global version variables) 
 # (/) Updated the japicmp configuration: Done in 
https://issues.apache.org/jira/browse/FLINK-34707
 # (/) Update the list of previous version in {{docs/config.toml}} on the 
master branch: Done in [https://github.com/apache/flink/pull/24548]
 # (/) Set {{show_outdated_warning: true}} in {{docs/config.toml}} in the 
branch of the _previous_ Flink version:  (for 1.17) 
[https://github.com/apache/flink/pull/24547]
 # (/) Update stable and master alias in 
[https://github.com/apache/flink/blob/master/.github/workflows/docs.yml] Done 
in 
[a6a4667|https://github.com/apache/flink/commit/a6a4667202a0f89fe63ff4f2e476c0200ec66e63]
   
[8ee552a|https://github.com/apache/flink/commit/8ee552a326e9fbcad1df5cfc1abb23ac2cdd56af]


was (Author: lincoln.86xy):
# (/) Website pull request to [list the 
release|http://flink.apache.org/downloads.html] merged
 # (/) Release announced on the user@ mailing list: [[announcement 
link|https://lists.apache.org/thread/sofmxytbh6y20nwot1gywqqc2lqxn4hm]]
 # (/) Blog post published, if applicable:[ blog 
post|https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/]
 # (/) Release recorded in [reporter.apache.org: 
https://reporter.apache.org/addrelease.html?flink|https://reporter.apache.org/addrelease.html?flink]
 # (/) Release announced on social media: 
[Twitter|https://twitter.com/ApacheFlink/status/1638839542403981312?ref_src=twsrc%5Etfw%7Ctwcamp%5Etweetembed%7Ctwterm%5E1638839542403981312%7Ctwgr%5E7f3046f67668cf3ebbd929ef126a32473db2a1b5%7Ctwcon%5Es1_c10_url=https%3A%2F%2Fpublish.twitter.com%2F%3Fquery%3Dhttps3A2F2Ftwitter.com2FApacheFlink2Fstatus2F1638839542403981312widget%3DTweet]
 # (/) Completion declared on the dev@ [mailing list 
|https://lists.apache.org/thread/z8sfwlppsodcyng62c584n76b69b16fc]
 # (/) Update Homebrew: 
[https://docs.brew.sh/How-To-Open-a-Homebrew-Pull-Request] (seems to be done 
automatically - at least for minor releases  for both minor and major 
releases): [https://formulae.brew.sh/formula/apache-flink#default]
 # (/) No need to update quickstart scripts in {{{}flink-web{}}}, under the 
{{q/}} directory (alread use global version variables) 
 #  Updated the japicmp configuration: Done in 
https://issues.apache.org/jira/browse/FLINK-34707
 # (/) Update the list of previous version in {{docs/config.toml}} on the 
master branch: Done in [https://github.com/apache/flink/pull/24548]
 # (/) Set {{show_outdated_warning: true}} in {{docs/config.toml}} in the 
branch of the _previous_ Flink version:  (for 1.17) 
[https://github.com/apache/flink/pull/24547]
 # (/) Update stable and master alias in 
[https://github.com/apache/flink/blob/master/.github/workflows/docs.yml] Done 
in 
[a6a4667|https://github.com/apache/flink/commit/a6a4667202a0f89fe63ff4f2e476c0200ec66e63]
   
[8ee552a|https://github.com/apache/flink/commit/8ee552a326e9fbcad1df5cfc1abb23ac2cdd56af]

> Promote release 1.19
> 
>
> Key: FLINK-34706
> URL: https://issues.apache.org/jira/browse/FLINK-34706
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.0
>Reporter: Lincoln Lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
>
> Once the release has been 

Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-22 Thread via GitHub


zhuzhurk closed pull request #24475: [FLINK-32513][core] Add predecessor caching
URL: https://github.com/apache/flink/pull/24475


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


snuyanzin commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1535501299


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/**
+ * The JDBC source reader to read data from jdbc splits.
+ *
+ * @param  The type of the record read from the source.
+ */
+public class JdbcSourceSplitReader
+implements SplitReader, JdbcSourceSplit>, 
ResultTypeQueryable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitReader.class);
+
+private final Configuration config;
+@Nullable private JdbcSourceSplit currentSplit;
+private final Queue splits;
+private final TypeInformation typeInformation;
+private final JdbcConnectionProvider connectionProvider;
+private transient Connection connection;
+private transient PreparedStatement statement;
+private transient ResultSet resultSet;
+
+private final ResultExtractor resultExtractor;
+protected boolean hasNextRecordCurrentSplit;
+private final DeliveryGuarantee deliveryGuarantee;
+
+private final int splitReaderFetchBatchSize;
+
+private final int resultSetType;
+private final int resultSetConcurrency;
+private final int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private final Boolean autoCommit;
+private int currentSplitOffset;
+
+private final SourceReaderContext context;
+
+public JdbcSourceSplitReader(
+SourceReaderContext context,
+Configuration config,
+TypeInformation typeInformation,
+JdbcConnectionProvider connectionProvider,
+DeliveryGuarantee deliveryGuarantee,
+ResultExtractor resultExtractor) {
+this.context = Preconditions.checkNotNull(context);
+this.config = Preconditions.checkNotNull(config);
+this.typeInformation = Preconditions.checkNotNull(typeInformation);
+this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
+this.resultSetType = config.getInteger(RESULTSET_TYPE);
+

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


snuyanzin commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1535498978


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java:
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/**
+ * A tool is used to build {@link JdbcSource} quickly.
+ *
+ * 
+ * JdbcSourceRow> source = JdbcSource.Row>builder()
+ *   .setSql(validSql)
+ *   .setResultExtractor(new RowResultExtractor())
+ *   .setDBUrl(dbUrl)
+ *   .setDriverName(driverName)
+ *   .setTypeInformation(new TypeHintRow>() {}.getTypeInfo())
+ *   .setPassword(password)
+ *   .setUsername(username)
+ *   .build();
+ * 
+ *
+ * In order to query the JDBC source in parallel, you need to provide a 
parameterized query
+ * template (i.e. a valid {@link PreparedStatement}) and a {@link 
JdbcParameterValuesProvider} which
+ * provides binding values for the query parameters. E.g.:
+ *
+ * 
+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JdbcSourceRow> jdbcSource =  JdbcSource.Row>builder()
+ *  .setResultExtractor(new RowResultExtractor())
+ *  .setTypeInformation(new TypeHintRow>() {}.getTypeInfo())
+ *  .setPassword(password)
+ *  .setUsername(username)
+ * .setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
+ * .setDBUrl("jdbc:derby:memory:ebookshop")
+ * .setSql("select * from books WHERE author = ?")
+ * .setJdbcParameterValuesProvider(new 
JdbcGenericParameterValuesProvider(queryParameters))
+ *  .build();
+ * 
+ *
+ * @see Row
+ * @see JdbcParameterValuesProvider
+ * @see PreparedStatement
+ * @see DriverManager
+ * @see JdbcSource
+ */
+@PublicEvolving
+public class JdbcSourceBuilder {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
+
+private final Configuration configuration;
+
+private int splitReaderFetchBatchSize;
+private int resultSetType;
+private int resultSetConcurrency;
+private int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private Boolean autoCommit;
+
+// TODO It would need a builder method to render after introducing 
streaming semantic.

Review Comment:
   should we fix this TODO first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

2024-03-22 Thread via GitHub


snuyanzin commented on PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-2014972105

   Thanks for taking a look @RocMarshal 
   
   if there is no objections I'm going to merge it in coming days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [release] Adjust website for Kubernetes operator 1.8.0 release [flink-web]

2024-03-22 Thread via GitHub


mxm commented on code in PR #726:
URL: https://github.com/apache/flink-web/pull/726#discussion_r1535474197


##
docs/content/posts/2024-03-21-release-kubernetes-operator-1.8.0.md:
##
@@ -0,0 +1,159 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.8.0 Release Announcement"
+date: "2024-03-21T18:00:00.000Z"
+authors:
+- mxm:
+  name: "Maximilian Michels"
+  twitter: "stadtlegende"
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- 1996fanrui:
+  name: "Rui Fan"
+  twitter: "1996fanrui"
+aliases:
+- /news/2024/03/21/release-kubernetes-operator-1.8.0.html
+---
+
+The Apache Flink community is excited to announce the release of Flink 
Kubernetes Operator 1.8.0!
+
+The release includes many improvements to the operator core, the autoscaler, 
and introduces new features
+like TaskManager memory auto-tuning.
+
+We encourage you to [download the 
release](https://flink.apache.org/downloads.html) and share your experience 
with the
+community through the Flink [mailing 
lists](https://flink.apache.org/community.html#mailing-lists) or
+[JIRA](https://issues.apache.org/jira/browse/flink)! We're looking forward to 
your feedback!
+
+## Highlights
+
+### Flink Autotuning

Review Comment:
   I had thought about that as well but decided to market it as a more general 
feature which hosts various functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [release] Adjust website for Kubernetes operator 1.8.0 release [flink-web]

2024-03-22 Thread via GitHub


gyfora commented on code in PR #726:
URL: https://github.com/apache/flink-web/pull/726#discussion_r1535467703


##
docs/content/posts/2024-03-21-release-kubernetes-operator-1.8.0.md:
##
@@ -0,0 +1,159 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.8.0 Release Announcement"
+date: "2024-03-21T18:00:00.000Z"
+authors:
+- mxm:
+  name: "Maximilian Michels"
+  twitter: "stadtlegende"
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- 1996fanrui:
+  name: "Rui Fan"
+  twitter: "1996fanrui"
+aliases:
+- /news/2024/03/21/release-kubernetes-operator-1.8.0.html
+---
+
+The Apache Flink community is excited to announce the release of Flink 
Kubernetes Operator 1.8.0!
+
+The release includes many improvements to the operator core, the autoscaler, 
and introduces new features
+like TaskManager memory auto-tuning.
+
+We encourage you to [download the 
release](https://flink.apache.org/downloads.html) and share your experience 
with the
+community through the Flink [mailing 
lists](https://flink.apache.org/community.html#mailing-lists) or
+[JIRA](https://issues.apache.org/jira/browse/flink)! We're looking forward to 
your feedback!
+
+## Highlights
+
+### Flink Autotuning

Review Comment:
   Should we call this `Flink Memory Autotuning` ?



##
docs/content/posts/2024-03-21-release-kubernetes-operator-1.8.0.md:
##
@@ -0,0 +1,159 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.8.0 Release Announcement"
+date: "2024-03-21T18:00:00.000Z"
+authors:
+- mxm:
+  name: "Maximilian Michels"
+  twitter: "stadtlegende"
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- 1996fanrui:
+  name: "Rui Fan"
+  twitter: "1996fanrui"
+aliases:
+- /news/2024/03/21/release-kubernetes-operator-1.8.0.html
+---
+
+The Apache Flink community is excited to announce the release of Flink 
Kubernetes Operator 1.8.0!
+
+The release includes many improvements to the operator core, the autoscaler, 
and introduces new features
+like TaskManager memory auto-tuning.
+
+We encourage you to [download the 
release](https://flink.apache.org/downloads.html) and share your experience 
with the
+community through the Flink [mailing 
lists](https://flink.apache.org/community.html#mailing-lists) or
+[JIRA](https://issues.apache.org/jira/browse/flink)! We're looking forward to 
your feedback!
+
+## Highlights
+
+### Flink Autotuning
+
+We're excited to announce our latest addition to the autoscaling module: Flink 
Autotuning.
+
+Flink Autotuning complements Flink Autoscaling by auto-adjusting critical 
setttings of the Flink configuration.
+For this release, we support auto-configuring Flink memory which is a huge 
source of pain for users. Flink has
+various memory pools (e.g. heap memory, network memory, state backend memory, 
JVM metaspace) which all need to be
+assigned fractions of the available memory upfront in order for a Flink job to 
run properly.
+
+Assigning too little memory results in pipeline failures, which is why most 
users end up assigning way too much memory.
+Based on our experience, we've seen that heap memory is at least 50% 
over-provisioned, even after using Flink Autoscaling.
+The reason is that Flink Autoscaling is primarily CPU-driven to optimize 
pipeline throughput, but doesn't change the
+ratio between CPU/Memory on the containers.
+
+Resource savings are nice to have, but the real power of Flink Autotuning is 
the reduced time to production.
+
+With Flink Autoscaling and Flink Autotuning, all users need to do is set a max 
memory size for the TaskManagers, just
+like they would normally configure TaskManager memory. Flink Autotuning then 
automatically adjusts the various memory
+pools and brings down the total container memory size. It does that by 
observing the actual max memory usage on the
+TaskMangers or by calculating the exact number of network buffers required for 
the job topology. The adjustments are
+made together with Flink Autoscaling, so there is no extra downtime involved.
+
+Flink Autotuning can be enabled by setting:
+
+```
+# Autoscaling needs to be enabled
+job.autoscaler.enabled: true
+# Turn on Autotuning
+job.autoscaler.memory.tuning.enabled: true
+```
+
+In the future, we are planning to auto-tune more aspects of the Flink 
configuration, e.g. the number of task slots.
+Another room for improvement is how managed memory is configured. If none is 
used, it will be set to zero. If managed
+memory is used, it will be kept constant. We also added an option to add all 
saved memory to the managed memory. This
+is beneficial when running with RocksDB to maximize performance.
+
+### Improved Accuracy of Autoscaling Metrics
+
+So far, Flink Autoscaling relied on sampling scaling metrics within the 
current metric window. The resulting accuracy
+depended on the number of samples and the sampling interval. For this release, 
whenever possible, we use Flink's
+accumulated metrics which provide cumulative counters of metrics like records 
processed or time 

Re: [PR] [release] Adjust website for Kubernetes operator 1.8.0 release [flink-web]

2024-03-22 Thread via GitHub


mxm commented on PR #726:
URL: https://github.com/apache/flink-web/pull/726#issuecomment-2014919701

   @1996fanrui @gyfora I added the release post.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-22 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2014912924

   one more idea
   some vendors allow to calculate intersections for arbitrary amount of arrays 
e.g.
   Clickhouse[1]
   [1] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34663) flink-opensearch connector Unable to parse response body for Response

2024-03-22 Thread wael shehata (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829840#comment-17829840
 ] 

wael shehata commented on FLINK-34663:
--

Thanks [~reta]  for your reply ... all the best regards.

> flink-opensearch connector Unable to parse response body for Response
> -
>
> Key: FLINK-34663
> URL: https://issues.apache.org/jira/browse/FLINK-34663
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: 1.18.1
> Environment: Docker-Compose:
> Flink 1.18.1 - Java11
> OpenSearch 2.12.0
> Flink-Sql-Opensearch-connector (flink 1.18.1 → Os 1.3)
>Reporter: wael shehata
>Priority: Major
> Attachments: image-2024-03-14-00-10-40-982.png
>
>
> I`m trying to use flink-sql-opensearch connector to sink stream data to 
> OpenSearch via Flink …
> After submitting the Job to Flink cluster successfully , the job runs 
> normally for 30sec and create the index with data … then it fails with the 
> following message:
> _*org.apache.flink.util.FlinkRuntimeException: Complete bulk has failed… 
> Caused by: java.io.IOException: Unable to parse response body for Response*_
> _*{requestLine=POST /_bulk?timeout=1m HTTP/1.1, 
> host=[http://172.20.0.6:9200|http://172.20.0.6:9200/], response=HTTP/1.1 200 
> OK}*_
> at 
> org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
> at 
> org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
> at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
> at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
> at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
> at 
> org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
> at 
> org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
> at 
> org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
> at 
> org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> at 
> org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
> at 
> org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
> at 
> org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> at 
> org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> at 
> org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> at 
> org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> at 
> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> at 
> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at 
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
> … 1 more
> *Caused by: java.lang.NullPointerException*
> *at java.base/java.util.Objects.requireNonNull(Unknown Source)*
> *at org.opensearch.action.DocWriteResponse.(DocWriteResponse.java:140)*
> *at org.opensearch.action.index.IndexResponse.(IndexResponse.java:67) …*
> It seems that this error is common but without any solution …
> the flink connector despite it was built for OpenSearch 1.3 , but it still 
> working in sending and creating index to OpenSearch 2.12.0 … but this error 
> persists with all OpenSearch versions greater than 1.13 …
> *Opensearch support reply was:*
> *"this is unexpected, could you please create an issue here [1], the issue is 
> caused by _type property that has been removed in 2.x"*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34919) WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST server

2024-03-22 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba updated FLINK-34919:

Priority: Critical  (was: Major)

> WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST 
> server
> 
>
> Key: FLINK-34919
> URL: https://issues.apache.org/jira/browse/FLINK-34919
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58482=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8641]
> {code:java}
> Mar 22 04:12:50 04:12:50.260 [INFO] Running 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
> Mar 22 04:12:50 04:12:50.609 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 0.318 s <<< FAILURE! -- in 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
> Mar 22 04:12:50 04:12:50.609 [ERROR] 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs
>  -- Time elapsed: 0.303 s <<< ERROR!
> Mar 22 04:12:50 java.net.BindException: Could not start rest endpoint on any 
> port in port range 8081
> Mar 22 04:12:50   at 
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:286)
> Mar 22 04:12:50   at 
> org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs(WebMonitorEndpointTest.java:69)
> Mar 22 04:12:50   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 22 04:12:50   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 22 04:12:50   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 22 04:12:50  {code}
> This was noted as a symptom of FLINK-22980, but doesn't have the same failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-22 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-2014868333

   > Looks good to me. Thanks for addressing all the comments! @jeyhunkarimov 
Would you squash the last two commits and rebase the changes onto the latest 
Flink master branch?
   
   Thanks a lot @zhuzhurk for your reviews. Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2024-03-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829830#comment-17829830
 ] 

Ryan Skraba commented on FLINK-28440:
-

1.20 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58481=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=8875]

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0, 1.18.0, 1.19.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> stale-assigned, test-stability
> Fix For: 1.20.0
>
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> 

[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-03-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829831#comment-17829831
 ] 

Ryan Skraba commented on FLINK-18476:
-

1.20 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58481=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=21983]

We should add 1.20 to the {{Affects version(s)}}

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34643) JobIDLoggingITCase failed

2024-03-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829832#comment-17829832
 ] 

Ryan Skraba commented on FLINK-34643:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58481=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=8247

> JobIDLoggingITCase failed
> -
>
> Key: FLINK-34643
> URL: https://issues.apache.org/jira/browse/FLINK-34643
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=7897
> {code}
> Mar 09 01:24:23 01:24:23.498 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 4.209 s <<< FAILURE! -- in 
> org.apache.flink.test.misc.JobIDLoggingITCase
> Mar 09 01:24:23 01:24:23.498 [ERROR] 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(ClusterClient) 
> -- Time elapsed: 1.459 s <<< ERROR!
> Mar 09 01:24:23 java.lang.IllegalStateException: Too few log events recorded 
> for org.apache.flink.runtime.jobmaster.JobMaster (12) - this must be a bug in 
> the test code
> Mar 09 01:24:23   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:148)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:132)
> Mar 09 01:24:23   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 09 01:24:23   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 09 01:24:23 
> {code}
> The other test failures of this build were also caused by the same test:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=8349
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187=logs=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3=712ade8c-ca16-5b76-3acd-14df33bc1cb1=8209



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-03-22 Thread via GitHub


flinkbot commented on PR #24555:
URL: https://github.com/apache/flink/pull/24555#issuecomment-2014858684

   
   ## CI report:
   
   * 4eeb7f45e18f5e50f62e2bf22281278c5a872511 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-22 Thread via GitHub


dawidwys commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2014857862

   Before I review the code let's settle on the behaviour first.
   
   @MartijnVisser What is your opinion on how the function should behave? 
Especially in the context of 
https://github.com/apache/flink/pull/23173#discussion_r1491044219 and handling 
duplicates.
   
   What should be the output of: `[1, 1, 1, 2] INTERSECT [1, 1, 2]`?
   1. [1,2] - Spark/Databricks/Presto
   2. [1,1,2] - Snowflake
   3. [1, 1, 1, 2] - (as far as I can tell the current behaviour of the PR)
   
   * 
[Snowflake](https://docs.snowflake.com/en/sql-reference/functions/array_intersection#usage-notes)
 has multi-set semantics.
   > If one array has N copies of a value, and the other array has M copies of 
the same value, then the number of copies in the returned array is the smaller 
of N or M. For example, if N is 4 and M is 2, then the returned value contains 
2 copies.
   
   * 
[Databricks](https://docs.databricks.com/en/sql/language-manual/functions/array_intersect.html#returns)
 deduplicates result 
([Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.array_intersect.html)
 I presume has the same behaviour)
   > An ARRAY of matching type to array1 with no duplicates and elements 
contained in both array1 and array2.
   
   * 
[Presto](https://prestodb.io/docs/current/functions/array.html#array_intersect) 
does the same as Spark:
   > Returns an array of the elements in the intersection of x and y, without 
duplicates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34919) WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST server

2024-03-22 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-34919:
---

 Summary: WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs 
fails starting REST server
 Key: FLINK-34919
 URL: https://issues.apache.org/jira/browse/FLINK-34919
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: Ryan Skraba


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58482=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8641]
{code:java}
Mar 22 04:12:50 04:12:50.260 [INFO] Running 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
Mar 22 04:12:50 04:12:50.609 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 0.318 s <<< FAILURE! -- in 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
Mar 22 04:12:50 04:12:50.609 [ERROR] 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs
 -- Time elapsed: 0.303 s <<< ERROR!
Mar 22 04:12:50 java.net.BindException: Could not start rest endpoint on any 
port in port range 8081
Mar 22 04:12:50 at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:286)
Mar 22 04:12:50 at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs(WebMonitorEndpointTest.java:69)
Mar 22 04:12:50 at java.lang.reflect.Method.invoke(Method.java:498)
Mar 22 04:12:50 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Mar 22 04:12:50  {code}
This was noted as a symptom of FLINK-22980, but doesn't have the same failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-03-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24939:
---
Labels: pull-request-available  (was: )

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-03-22 Thread via GitHub


liyubin117 opened a new pull request, #24555:
URL: https://github.com/apache/flink/pull/24555

   ## What is the purpose of the change
   
   Support `SHOW CREATE CATALOG` syntax to output the catalog name and relevant 
properties, which allows you to easily reuse the created catalogs.
   
   ## Brief change log
   
   * SHOW CREATE CATALOG catalog_name
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
   flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34902][table] Fix column mismatch IndexOutOfBoundsException [flink]

2024-03-22 Thread via GitHub


twalthr commented on code in PR #24552:
URL: https://github.com/apache/flink/pull/24552#discussion_r1535390991


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala:
##
@@ -234,13 +234,23 @@ object PreValidateReWriter {
 
 call.getKind match {
   case SqlKind.SELECT =>
+if (
+  targetPosition
+.size() != 0 && call.asInstanceOf[SqlSelect].getSelectList.size() 
!= targetPosition

Review Comment:
   to improve code readability:
   - you can store `call.asInstanceOf[SqlSelect]` in a local var because its 
needed again below
   - also `targetPosition.isEmpty` is shorter
   
   This should make the if more readable and fit into one line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34044) Kinesis Sink Cannot be Created via TableDescriptor

2024-03-22 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829822#comment-17829822
 ] 

Danny Cranmer commented on FLINK-34044:
---

Merged commit 
[{{35be16b}}|https://github.com/apache/flink-connector-aws/commit/35be16bfb5696f7fc1b261b6ab4f9cd95c403ed0]
 into apache:main 

> Kinesis Sink Cannot be Created via TableDescriptor
> --
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.2.0
>Reporter: Tilman Krokotsch
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34044) Kinesis Sink Cannot be Created via TableDescriptor

2024-03-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-34044:
--
Fix Version/s: aws-connector-4.3.0

> Kinesis Sink Cannot be Created via TableDescriptor
> --
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.2.0
>Reporter: Tilman Krokotsch
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34044) Kinesis Sink Cannot be Created via TableDescriptor

2024-03-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-34044.
---
Resolution: Fixed

> Kinesis Sink Cannot be Created via TableDescriptor
> --
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.2.0
>Reporter: Tilman Krokotsch
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34044] Copy dynamic table options before mapping deprecated configs [flink-connector-aws]

2024-03-22 Thread via GitHub


dannycranmer merged PR #132:
URL: https://github.com/apache/flink-connector-aws/pull/132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34918) Introduce the support of catalog for comments

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34918:


 Summary: Introduce the support of catalog for comments
 Key: FLINK-34918
 URL: https://issues.apache.org/jira/browse/FLINK-34918
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li


We propose to introduce `getComment()` method in `CatalogDescriptor`, and the 
reasons are as follows.

1. For the sake of design consistency, follow the design of FLIP-295 [1] which 
introduced `CatalogStore` component, `CatalogDescriptor` includes names and 
attributes, both of which are used to describe the catalog, and `comment` can 
be added smoothly.

2. Extending the existing class rather than add new method to the existing 
interface, Especially, the `Catalog` interface, as a core interface, is used by 
a series of important components such as `CatalogFactory`, `CatalogManager` and 
`FactoryUtil`, and is implemented by a large number of connectors such as JDBC, 
Paimon, and Hive. Adding methods to it will greatly increase the implementation 
complexity, and more importantly, increase the cost of iteration, maintenance, 
and verification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34918) Introduce the support of Catalog for comments

2024-03-22 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34918:
-
Summary: Introduce the support of Catalog for comments  (was: Introduce the 
support of catalog for comments)

> Introduce the support of Catalog for comments
> -
>
> Key: FLINK-34918
> URL: https://issues.apache.org/jira/browse/FLINK-34918
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Priority: Major
>
> We propose to introduce `getComment()` method in `CatalogDescriptor`, and the 
> reasons are as follows.
> 1. For the sake of design consistency, follow the design of FLIP-295 [1] 
> which introduced `CatalogStore` component, `CatalogDescriptor` includes names 
> and attributes, both of which are used to describe the catalog, and `comment` 
> can be added smoothly.
> 2. Extending the existing class rather than add new method to the existing 
> interface, Especially, the `Catalog` interface, as a core interface, is used 
> by a series of important components such as `CatalogFactory`, 
> `CatalogManager` and `FactoryUtil`, and is implemented by a large number of 
> connectors such as JDBC, Paimon, and Hive. Adding methods to it will greatly 
> increase the implementation complexity, and more importantly, increase the 
> cost of iteration, maintenance, and verification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34917) Support enhanced `CREATE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34917:


 Summary: Support enhanced `CREATE CATALOG` syntax
 Key: FLINK-34917
 URL: https://issues.apache.org/jira/browse/FLINK-34917
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li
 Attachments: image-2024-03-22-18-31-59-632.png

{{IF NOT EXISTS}}  clause: If the catalog already exists, nothing happens.

{{COMMENT}} clause: An optional string literal. The description for the catalog.

NOTICE: we just need to introduce the '[IF NOT EXISTS]' and '[COMMENT]' clause 
to the 'create catalog' statement.

!image-2024-03-22-18-31-59-632.png|width=795,height=87!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34916) Support `ALTER CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34916:


 Summary: Support `ALTER CATALOG` syntax
 Key: FLINK-34916
 URL: https://issues.apache.org/jira/browse/FLINK-34916
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li
 Attachments: image-2024-03-22-18-30-33-182.png

Set one or more properties in the specified catalog. If a particular property 
is already set in the catalog, override the old value with the new one.

!image-2024-03-22-18-30-33-182.png|width=736,height=583!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34915) Introduce `DESCRIBE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34915:


 Summary: Introduce `DESCRIBE CATALOG` syntax
 Key: FLINK-34915
 URL: https://issues.apache.org/jira/browse/FLINK-34915
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li


Describe the metadata of an existing catalog. The metadata information includes 
the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is 
specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it 
is not actually available. we can complete the syntax in this FLIP. 
{{Flink SQL> describe catalog cat2;}}
{{+--+---+}}
{{| catalog_description_item | catalog_description_value |}}
{{+--+---+}}
{{| Name |  cat2 |}}
{{| Type | generic_in_memory |}}
{{|  Comment |   |}}
{{+--+---+}}
{{3 rows }}{{in}} {{set}}
 
{{Flink SQL> describe catalog extended cat2;}}
{{+--+-+}}
{{| catalog_description_item |   catalog_description_value 
|}}
{{+--+-+}}
{{| Name |    cat2 
|}}
{{| Type |   generic_in_memory 
|}}
{{|  Comment | 
|}}
{{{}|   Properties | (default-database,db), 
({}}}{{{}type{}}}{{{},generic_in_memory) |{}}}
{{+--+-+}}
{{4 rows }}{{in}} {{set}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34044) Kinesis Sink Cannot be Created via TableDescriptor

2024-03-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-34044:
-

Assignee: Ahmed Hamdy

> Kinesis Sink Cannot be Created via TableDescriptor
> --
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.2.0
>Reporter: Tilman Krokotsch
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34915) Support `DESCRIBE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34915:
-
Description: 
Describe the metadata of an existing catalog. The metadata information includes 
the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is 
specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it 
is not actually available. we can complete the syntax in this FLIP. 

!image-2024-03-22-18-29-00-454.png|width=561,height=374!

  was:
Describe the metadata of an existing catalog. The metadata information includes 
the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is 
specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it 
is not actually available. we can complete the syntax in this FLIP. 
{{Flink SQL> describe catalog cat2;}}
{{+--+---+}}
{{| catalog_description_item | catalog_description_value |}}
{{+--+---+}}
{{| Name |  cat2 |}}
{{| Type | generic_in_memory |}}
{{|  Comment |   |}}
{{+--+---+}}
{{3 rows }}{{in}} {{set}}
 
{{Flink SQL> describe catalog extended cat2;}}
{{+--+-+}}
{{| catalog_description_item |   catalog_description_value 
|}}
{{+--+-+}}
{{| Name |    cat2 
|}}
{{| Type |   generic_in_memory 
|}}
{{|  Comment | 
|}}
{{{}|   Properties | (default-database,db), 
({}}}{{{}type{}}}{{{},generic_in_memory) |{}}}
{{+--+-+}}
{{4 rows }}{{in}} {{set}}


> Support `DESCRIBE CATALOG` syntax
> -
>
> Key: FLINK-34915
> URL: https://issues.apache.org/jira/browse/FLINK-34915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2024-03-22-18-29-00-454.png
>
>
> Describe the metadata of an existing catalog. The metadata information 
> includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} 
> option is specified, catalog properties are also returned.
> NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and 
> it is not actually available. we can complete the syntax in this FLIP. 
> !image-2024-03-22-18-29-00-454.png|width=561,height=374!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34915) Support `DESCRIBE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34915:
-
Attachment: image-2024-03-22-18-29-00-454.png

> Support `DESCRIBE CATALOG` syntax
> -
>
> Key: FLINK-34915
> URL: https://issues.apache.org/jira/browse/FLINK-34915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2024-03-22-18-29-00-454.png
>
>
> Describe the metadata of an existing catalog. The metadata information 
> includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} 
> option is specified, catalog properties are also returned.
> NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and 
> it is not actually available. we can complete the syntax in this FLIP. 
> {{Flink SQL> describe catalog cat2;}}
> {{+--+---+}}
> {{| catalog_description_item | catalog_description_value |}}
> {{+--+---+}}
> {{| Name |  cat2 |}}
> {{| Type | generic_in_memory |}}
> {{|  Comment |   |}}
> {{+--+---+}}
> {{3 rows }}{{in}} {{set}}
>  
> {{Flink SQL> describe catalog extended cat2;}}
> {{+--+-+}}
> {{| catalog_description_item |   
> catalog_description_value |}}
> {{+--+-+}}
> {{| Name |    
> cat2 |}}
> {{| Type |   
> generic_in_memory |}}
> {{|  Comment |
>  |}}
> {{{}|   Properties | (default-database,db), 
> ({}}}{{{}type{}}}{{{},generic_in_memory) |{}}}
> {{+--+-+}}
> {{4 rows }}{{in}} {{set}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34915) Support `DESCRIBE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34915:
-
Summary: Support `DESCRIBE CATALOG` syntax  (was: Introduce `DESCRIBE 
CATALOG` syntax)

> Support `DESCRIBE CATALOG` syntax
> -
>
> Key: FLINK-34915
> URL: https://issues.apache.org/jira/browse/FLINK-34915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2024-03-22-18-29-00-454.png
>
>
> Describe the metadata of an existing catalog. The metadata information 
> includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} 
> option is specified, catalog properties are also returned.
> NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and 
> it is not actually available. we can complete the syntax in this FLIP. 
> {{Flink SQL> describe catalog cat2;}}
> {{+--+---+}}
> {{| catalog_description_item | catalog_description_value |}}
> {{+--+---+}}
> {{| Name |  cat2 |}}
> {{| Type | generic_in_memory |}}
> {{|  Comment |   |}}
> {{+--+---+}}
> {{3 rows }}{{in}} {{set}}
>  
> {{Flink SQL> describe catalog extended cat2;}}
> {{+--+-+}}
> {{| catalog_description_item |   
> catalog_description_value |}}
> {{+--+-+}}
> {{| Name |    
> cat2 |}}
> {{| Type |   
> generic_in_memory |}}
> {{|  Comment |
>  |}}
> {{{}|   Properties | (default-database,db), 
> ({}}}{{{}type{}}}{{{},generic_in_memory) |{}}}
> {{+--+-+}}
> {{4 rows }}{{in}} {{set}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-03-22 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-24939:
-
Parent: FLINK-34914
Issue Type: Sub-task  (was: Improvement)

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34914) FLIP-436: Introduce Catalog-related Syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34914:


 Summary: FLIP-436: Introduce Catalog-related Syntax
 Key: FLINK-34914
 URL: https://issues.apache.org/jira/browse/FLINK-34914
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li


Umbrella issue for: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-436%3A+Introduce+Catalog-related+Syntax



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535341562


##
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/stream/DataStream.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for all streams.
+ *
+ * Note: This is only used for internal implementation. It must not leak to 
user face api.
+ */
+@Internal

Review Comment:
   I have removed this annotation and renamed this class to 
`AbstractDataStream`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34913][metrics][state] Fix ConcurrentModificationException in SubTaskInitializationMetricsBuilder.addDurationMetric [flink]

2024-03-22 Thread via GitHub


flinkbot commented on PR #24554:
URL: https://github.com/apache/flink/pull/24554#issuecomment-2014773571

   
   ## CI report:
   
   * 744aafbb68656da6ab4c3558409cbad51b641038 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on PR #24422:
URL: https://github.com/apache/flink/pull/24422#issuecomment-2014774547

   Thanks @xintongsong for the review! I have addressed all the comments. 
   
   Sorry for the rebase as we have a conflict with master branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34913:
---
Labels: pull-request-available  (was: )

> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
> {noformat}
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34913][metrics][state] Fix ConcurrentModificationException in SubTaskInitializationMetricsBuilder.addDurationMetric [flink]

2024-03-22 Thread via GitHub


pnowojski opened a new pull request, #24554:
URL: https://github.com/apache/flink/pull/24554

   ## Verifying this change
   
   This change is currently not tested, as the code path is permanently 
disabled waiting for FRocksDB release. In the future that will be covered by 
existing ITCases.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535343197


##
flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.v2;

Review Comment:
   Renamed.



##
flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.v2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */
+@Experimental
+public final class SourceUtils {

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535341354


##
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentFactory.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.process.api.ExecutionEnvironment;
+
+/** Factory class for execution environments. */
+@FunctionalInterface
+@Experimental

Review Comment:
   Good catch, fixed.



##
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/stream/DataStream.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for all streams.
+ *
+ * Note: This is only used for internal implementation. It must not leak to 
user face api.
+ */
+@Internal

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535343533


##
flink-core/src/main/java/org/apache/flink/api/connector/v2/SinkUtils.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.v2;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Utils to convert the sink-v2 based sink to a DataStream v2 Source. */
+@Experimental
+public class SinkUtils {

Review Comment:
   Renamed.



##
flink-core/src/main/java/org/apache/flink/api/connector/v2/SinkUtils.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.v2;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Utils to convert the sink-v2 based sink to a DataStream v2 Source. */
+@Experimental
+public class SinkUtils {
+public static  Sink 
wrapSink(org.apache.flink.api.connector.sink2.Sink sink) {

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535342882


##
flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.v2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */
+@Experimental
+public final class SourceUtils {
+public static  Source wrapSource(

Review Comment:
   Yes, I somehow forgot it :) Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on PR #24513:
URL: https://github.com/apache/flink/pull/24513#issuecomment-2014755359

   @Zakelly @masteryhx  Thanks for the detailed review, I update the PR 
according to your suggestion. Please take a look again.
   
   > I'm wandering is it possible to add DirectoryStreamStateHandle at some 
point before reporting to JM, instead of providing it within each 
SegmentFileStateHandle ?
   
   Yes, I've stored the `DirectoryStreamStateHandle` of each subtask in 
`FileMergingSnapshotManager`. Now, they are only created once when the job 
starts.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535341028


##
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.process.api.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The implementation of {@link ExecutionEnvironment}. */

Review Comment:
   Make sense! I have pointed out this both in the java doc of this class and 
the `newInstance` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535339803


##
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.api;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+
+/**
+ * This is the context in which a program is executed.
+ *
+ * The environment provides methods to create a DataStream and control the 
job execution.
+ */
+@Experimental
+public interface ExecutionEnvironment {
+/**
+ * Get the execution environment instance.
+ *
+ * @return A {@link ExecutionEnvironment} instance.
+ */
+static ExecutionEnvironment getExecutionEnvironment() throws 
ReflectiveOperationException {

Review Comment:
   Renamed to `getInstance`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1535339503


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java:
##
@@ -418,6 +430,62 @@ void testSnapshotEmpty() throws Exception {
 assertThat(stateHandle).isNull();
 }
 
+@Test
+void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws 
Exception {

Review Comment:
   I added `testSegmentStateHandleStateRegister` and 
`testFireMergingOperatorStateRegister` in `SharedStateRegistryTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535339356


##
pom.xml:
##
@@ -2366,6 +2366,7 @@ under the License.

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)

org.apache.flink.api.common.functions.RuntimeContext
+   
org.apache.flink.api.common.functions.Function

Review Comment:
   Good suggestion. I have created FLINK-34899 and FLINK-34900 to track this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1535338210


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Optional;
+
+/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
+public class DirectoryStreamStateHandle extends DirectoryStateHandle 
implements StreamStateHandle {
+
+private static final long serialVersionUID = -6453596108675892492L;
+
+public DirectoryStreamStateHandle(@Nonnull Path directory, long 
directorySize) {
+super(directory, directorySize);
+}
+
+@Override
+public FSDataInputStream openInputStream() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public Optional asBytesIfInMemory() {
+return Optional.empty();
+}
+
+@Override
+public PhysicalStateHandleID getStreamStateHandleID() {
+return new PhysicalStateHandleID(getDirectory().toString());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+
+DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o;
+
+return getDirectory().equals(that.getDirectory());
+}
+
+@Override
+public String toString() {
+return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + 
'}';
+}
+
+public static DirectoryStreamStateHandle forPathWithSize(@Nonnull Path 
directory) {
+long size;
+try {
+size = FileUtils.getDirectoryFilesSize(directory);
+} catch (IOException e) {
+size = 0L;
+}
+return new DirectoryStreamStateHandle(directory, size);
+}
+
+public static SharedStateRegistryKey createStateRegistryKey(

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34548][API] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-03-22 Thread via GitHub


reswqa commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1535336409


##
flink-process-function-parent/pom.xml:
##
@@ -0,0 +1,40 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   flink-parent

Review Comment:
   By offline discuss, the parent module is useless. I have un-nested the 
module structure and renamed `flink-process-function` and 
`flink-process-function-api` to `flink-datastream` and `flink-datastream-api` 
repectively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1535337848


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java:
##
@@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
  * @param scope The state's scope, whether it is exclusive or shared.
  */
 public SegmentFileStateHandle(
-Path filePath, long startPos, long stateSize, 
CheckpointedStateScope scope) {
+Path directoryPath,
+Path filePath,
+long startPos,
+long stateSize,
+CheckpointedStateScope scope) {
 this.filePath = filePath;
 this.stateSize = stateSize;
 this.startPos = startPos;
 this.scope = scope;
+this.directoryStateHandle =
+DirectoryStreamStateHandle.forPathWithSize(
+new File(directoryPath.getPath()).toPath());

Review Comment:
   , I've stored the `DirectoryStreamStateHandle` of each subtask in 
`FileMergingSnapshotManager`. Now, they are only created once when job start.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34912) Replace all occurrences of com.ververica in the project with org.apache.flink

2024-03-22 Thread Xiao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829813#comment-17829813
 ] 

Xiao Huang edited comment on FLINK-34912 at 3/22/24 9:53 AM:
-

There is a pr has done this.

[[hotfix] Change old com.ververica dependency to flink by xleoken · Pull 
Request #3110 · apache/flink-cdc 
(github.com)|https://github.com/apache/flink-cdc/pull/3110]


was (Author: shawnhx):
[[hotfix] Change old com.ververica dependency to flink by xleoken · Pull 
Request #3110 · apache/flink-cdc 
(github.com)|https://github.com/apache/flink-cdc/pull/3110]

> Replace all occurrences of com.ververica in the project with org.apache.flink
> -
>
> Key: FLINK-34912
> URL: https://issues.apache.org/jira/browse/FLINK-34912
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Xiao Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34912) Replace all occurrences of com.ververica in the project with org.apache.flink

2024-03-22 Thread Xiao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829813#comment-17829813
 ] 

Xiao Huang commented on FLINK-34912:


[[hotfix] Change old com.ververica dependency to flink by xleoken · Pull 
Request #3110 · apache/flink-cdc 
(github.com)|https://github.com/apache/flink-cdc/pull/3110]

> Replace all occurrences of com.ververica in the project with org.apache.flink
> -
>
> Key: FLINK-34912
> URL: https://issues.apache.org/jira/browse/FLINK-34912
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Xiao Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [build] fix inconsistent Kafka shading among cdc connectors [flink-cdc]

2024-03-22 Thread via GitHub


Shawn-Hx commented on code in PR #2988:
URL: https://github.com/apache/flink-cdc/pull/2988#discussion_r1535319684


##
flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-tidb-cdc/pom.xml:
##
@@ -60,6 +60,12 @@ under the License.
 
 
 
+
+org.apache.kafka
+
+
com.ververica.cdc.connectors.shaded.org.apache.kafka

Review Comment:
   > I found no shaded pattern has migrated from 
`com.ververica.cdc.connectors.shaded` yet. Maybe we should do it all together 
in an other PR?
   
   See https://github.com/apache/flink-cdc/pull/3110.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34912][cdc] replace all com.ververica with org.apache.flink [flink-cdc]

2024-03-22 Thread via GitHub


Shawn-Hx commented on PR #3188:
URL: https://github.com/apache/flink-cdc/pull/3188#issuecomment-2014716080

   > hi @Shawn-Hx this is redundant. #3110
   
   Thanks, this pr is closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34912][cdc] replace all com.ververica with org.apache.flink [flink-cdc]

2024-03-22 Thread via GitHub


Shawn-Hx closed pull request #3188: [FLINK-34912][cdc] replace all 
com.ververica with org.apache.flink
URL: https://github.com/apache/flink-cdc/pull/3188


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34912][cdc] replace all com.ververica with org.apache.flink [flink-cdc]

2024-03-22 Thread via GitHub


charlesy6 commented on PR #3188:
URL: https://github.com/apache/flink-cdc/pull/3188#issuecomment-2014706899

   hi @Shawn-Hx this is redundant. https://github.com/apache/flink-cdc/pull/3110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34526][runtime] Actively disconnect the TM in RM to reduce restart time [flink]

2024-03-22 Thread via GitHub


qinf commented on PR #24539:
URL: https://github.com/apache/flink/pull/24539#issuecomment-2014625860

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34913:
---
Description: 
The following failures can occur during job's recovery when using clip & 
ingest. This code path is currently not available, so the bug can not happen 
for users.

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}


  was:
The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}



> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.1
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> 

[jira] [Updated] (FLINK-34912) Replace all occurrences of com.ververica in the project with org.apache.flink

2024-03-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34912:
---
Labels: pull-request-available  (was: )

> Replace all occurrences of com.ververica in the project with org.apache.flink
> -
>
> Key: FLINK-34912
> URL: https://issues.apache.org/jira/browse/FLINK-34912
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Xiao Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >