Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
wuchong merged PR #2167: URL: https://github.com/apache/fluss/pull/2167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2645173454
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for batch source in Flink 2.2. */
+public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+@Override
+void testCountPushDown(boolean partitionTable) throws Exception {
+String tableName = partitionTable ? preparePartitionedLogTable() :
prepareLogTable();
+int expectedRows = partitionTable ? 10 : 5;
+// normal scan
+String query = String.format("SELECT COUNT(*) FROM %s", tableName);
+assertThat(tEnv.explainSql(query))
+.contains(
+String.format(
+"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
++ "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction(), "
++ "fields=[count1$0])",
+tableName));
Review Comment:
@vamossagar12 I have talked with Jark. We don't care whether id is projected
down if count is pushed down(we just calculate count by log offset diff.
I have modified it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3688981556 > Thanks @vamossagar12 and @loserwang1024 for this great work. > > However, it seems `Flink21MultipleParameterToolTest` and `Flink21MaterializedTableITCase` are missed to move to Flink 2.2 Thanks for the review. I addressed the comments and added the 2 missing tests. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2645049561
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog22Test.java:
##
@@ -59,4 +46,30 @@ protected ResolvedSchema createSchema() {
DefaultIndex.newIndex(
"INDEX_first_third", Arrays.asList("first",
"third";
}
+
+protected CatalogMaterializedTable newCatalogMaterializedTable(
+ResolvedSchema resolvedSchema,
+CatalogMaterializedTable.RefreshMode refreshMode,
+Map options) {
+CatalogMaterializedTable origin =
+CatalogMaterializedTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+.comment("test comment")
+.options(options)
+.partitionKeys(Collections.emptyList())
+.definitionQuery("select first, second, third from t")
+.freshness(IntervalFreshness.of("5",
IntervalFreshness.TimeUnit.SECOND))
+.logicalRefreshMode(
+refreshMode ==
CatalogMaterializedTable.RefreshMode.CONTINUOUS
+?
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS
+:
CatalogMaterializedTable.LogicalRefreshMode.FULL)
+.refreshMode(refreshMode)
+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+.build();
+return new ResolvedCatalogMaterializedTable(
+origin,
+resolvedSchema,
+refreshMode,
+IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND));
Review Comment:
I removed the override method `newCatalogMaterializedTable` from
`Flink22CatalogTest` as it's implementation was the same as the one in the base
class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2645046885
##
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/FlinkSinkAdapter.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import java.io.IOException;
+
+/**
+ * Flink sink adapter which hide the different version of createWriter method.
+ *
+ * TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public abstract class FlinkSinkAdapter implements Sink {
Review Comment:
Done.
##
fluss-test-coverage/pom.xml:
##
@@ -68,12 +68,6 @@
compile
-
-org.apache.fluss
-fluss-flink-2.1
-${project.version}
-compile
-
Review Comment:
Added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2644996563
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.2. */
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Review Comment:
Thank you! Resolving.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2645020768
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for batch source in Flink 2.2. */
+public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+@Override
+void testCountPushDown(boolean partitionTable) throws Exception {
+String tableName = partitionTable ? preparePartitionedLogTable() :
prepareLogTable();
+int expectedRows = partitionTable ? 10 : 5;
+// normal scan
+String query = String.format("SELECT COUNT(*) FROM %s", tableName);
+assertThat(tEnv.explainSql(query))
+.contains(
+String.format(
+"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
++ "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction(), "
++ "fields=[count1$0])",
+tableName));
Review Comment:
Yes, since it's a behaviour change due to upstream changes in Flink 2.2, how
about we add a TODO here? In the future, if Flink 2.2 fixes this, this test may
also fail which gives us an opportunity to fix this behaviour or remove the
override?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2644996563
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.2. */
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Review Comment:
Thank you, if there are no takers for #2231 , i can also take it up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2644736929
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for batch source in Flink 2.2. */
+public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+@Override
+void testCountPushDown(boolean partitionTable) throws Exception {
+String tableName = partitionTable ? preparePartitionedLogTable() :
prepareLogTable();
+int expectedRows = partitionTable ? 10 : 5;
+// normal scan
+String query = String.format("SELECT COUNT(*) FROM %s", tableName);
+assertThat(tEnv.explainSql(query))
+.contains(
+String.format(
+"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
++ "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction(), "
++ "fields=[count1$0])",
+tableName));
Review Comment:
> this override was only introduced for Flink 2.2 to accommodate the removal
of the project=[id] field from the plan string.
It seems a bug in 2.2.0: https://issues.apache.org/jira/browse/FLINK-38832
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
wuchong commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2644734876
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.2. */
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Review Comment:
I created https://github.com/apache/fluss/issues/2231 as a follow-up issue
to improve the tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
wuchong commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2644725981
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.2. */
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Review Comment:
Sorry for the confusion, I mixed things up. These two tests were actually
introduced in Flink 2.2. Please ignore my previous comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
wuchong commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2644704177
##
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/FlinkSinkAdapter.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import java.io.IOException;
+
+/**
+ * Flink sink adapter which hide the different version of createWriter method.
+ *
+ * TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public abstract class FlinkSinkAdapter implements Sink {
Review Comment:
Rename to `SinkAdapter` to avoid naming confusion with `FlinkSink`.
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for batch source in Flink 2.2. */
+public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+@Override
+void testCountPushDown(boolean partitionTable) throws Exception {
+String tableName = partitionTable ? preparePartitionedLogTable() :
prepareLogTable();
+int expectedRows = partitionTable ? 10 : 5;
+// normal scan
+String query = String.format("SELECT COUNT(*) FROM %s", tableName);
+assertThat(tEnv.explainSql(query))
+.contains(
+String.format(
+"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
++ "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction(), "
++ "fields=[count1$0])",
+tableName));
Review Comment:
Let’s avoid overriding the entire test method if possible. Full overrides
can create maintenance friction. For example, if the base method is updated but
the override isn’t synchronized accordingly.
As I understand it, this override was only introduced for Flink 2.2 to
accommodate the removal of the `project=[id]` field from the plan string.
Instead, we can maintain compatibility by extract the `TableSourceScan(`
line from the plan and asserting this line contains:
`aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]],
fields=[count1$0]`
to verify that the `COUNT` aggregation has been correctly pushed down.
##
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog22Test.java:
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
wuchong commented on code in PR #2167: URL: https://github.com/apache/fluss/pull/2167#discussion_r2643836266 ## website/src/pages/downloads.md: ## @@ -9,24 +9,24 @@ Repo: https://downloads.apache.org/incubator/fluss/helm-chart ## Apache Fluss 0.8.0 -| Artifact | Signature | SHA | -||-|-| -| [Fluss Binary Release](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-bin.tgz) | [.asc](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-bin.tgz.asc) | [.sha512](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-bin.tgz.sha512) | -| [Fluss Source Release](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-src.tgz) | [.asc](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-src.tgz.asc) | [.sha512](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-src.tgz.sha512) | -| [Fluss Helm Chart](https://downloads.apache.org/incubator/fluss/helm-chart/0.8.0-incubating/fluss-0.8.0-incubating.tgz) | [.asc](https://downloads.apache.org/incubator/fluss/helm-chart/0.8.0-incubating/fluss-0.8.0-incubating.tgz.asc) | [.sha512](https://downloads.apache.org/incubator/fluss/helm-chart/0.8.0-incubating/fluss-0.8.0-incubating.tgz.sha512) | +| Artifact | Signature | SHA | +||---|-| +| [Fluss Binary Release](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-bin.tgz) | [.asc](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-bin.tgz.asc) | [.sha512](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-bin.tgz.sha512) | +| [Fluss Source Release](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-src.tgz) | [.asc](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-src.tgz.asc) | [.sha512](https://downloads.apache.org/incubator/fluss/fluss-0.8.0-incubating/fluss-0.8.0-incubating-src.tgz.sha512) | +| [Fluss Helm Chart](https://downloads.apache.org/incubator/fluss/helm-chart/0.8.0-incubating/fluss-0.8.0-incubating.tgz) | [.asc](https://downloads.apache.org/incubator/fluss/helm-chart/0.8.0-incubating/fluss-0.8.0-incubating.tgz.asc) | [.sha512](https://downloads.apache.org/incubator/fluss/helm-chart/0.8.0-incubating/fluss-0.8.0-incubating.tgz.sha512) | | [Fluss Tiering Service Jar](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-tiering/0.8.0-incubating/fluss-flink-tiering-0.8.0-incubating.jar) | [.asc](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-tiering/0.8.0-incubating/fluss-flink-tiering-0.8.0-incubating.jar.asc) | [.sha1](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-tiering/0.8.0-incubating/fluss-flink-tiering-0.8.0-incubating.jar.sha1) | -| [Connector Jar for Flink 2.1](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-2.1/0.8.0-incubating/fluss-flink-2.1-0.8.0-incubating.jar) | [.asc](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-2.1/0.8.0-incubating/fluss-flink-2.1-0.8.0-incubating.jar.asc) | [.sha1](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-2.1/0.8.0-incubating/fluss-flink-2.1-0.8.0-incubating.jar.sha1) | -| [Connector Jar for Flink 1.20](https://repo1.maven.org
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3686603883 @leonardBang @wuchong flink 2.1 has been upgraded to flink 2.2, please help review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3686455485 Ok, thanks for the chnages @loserwang1024 . LGTM ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
wuchong commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3686252724 +1 to delete flink 2.1 We can do this just in this PR, making the support of Flink 2.2 as an upgrade from Flink 2.1 to Flink 2.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
leonardBang commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3686235814 > Currently, `flink` test spends almost 45 minutes and `flink-lake` test spends almost 39 minutes. Should we delete a old module? > > @wuchong @leonardBang , WDYT? +1 to delete 2.1 module and add support for 2.2, Flink 2.0 and 2.1 are rarely used bye users from my observation, maybe they're not stable enough as Flink's new major version, Flink 2.2 should be more stable than them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3686056133 https://github.com/user-attachments/assets/8717e75f-530f-422e-a99f-fa6612cccf79"; /> Currently, `flink` test spends almost 45 minutes and `flink-lake` test spends almost 39 minutes. Should we delete a old module? @wuchong @leonardBang , WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3685721898 @vamossagar12 I have add some adjusts to the code, you can add this to your commit. 1. merge flink21Catalog and flink22Catalog to FlinkCatalog 2. add flink-2.2 to ci tests of flink 3. fix wrong test. https://github.com/loserwang1024/fluss/tree/hongshun-union-flink22 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3680462471 There are some test failures, will take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3678357668 > I also consider whether we need to move Flink21CatalogFactory to fluss-flink-common . Then flink 2.1 and 2.2 can share it. No need too much same code. > > FlinkCatalogFactory > > * flink 1.18 > * flink 1.19 > * fink 1.20 > * flink 2.0 > > FlinkCatalogGreeterThan21Factory > > * fink 2.1 > * fink 2.2 > > flink below 2.0 won't use FlinkCatalogGreeterThan21Factory code. Thus no problem will occurs. Thanks, I was thinking to do it in a subsequent PR. This PR also doesn't create an adapter layer for Flink 2.2,. we can adopt a similar approach for 2.1 in the common code. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3663525822 I also consider whether we nee to move Flink21CatalogFactory to fluss common . Then flink 2.1 and 2.2 can share it. No need too much same code. FlinkCatalogFactory * flink 1.18 * flink 1.19 * fink 1.20 * flink 2.0 FlinkCatalogGreeterThan21Factory * fink 2.1 * fink 2.2 flink below 2.0 won't use FlinkCatalogGreeterThan21Factory code. Thus no problem will occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on code in PR #2167:
URL: https://github.com/apache/fluss/pull/2167#discussion_r2625491545
##
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/catalog/Flink22Catalog.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+import org.apache.fluss.metadata.TableInfo;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link FlinkCatalog} used for Flink 2.2. */
+public class Flink22Catalog extends FlinkCatalog {
+
+public Flink22Catalog(
+String name,
+String defaultDatabase,
+String bootstrapServers,
+ClassLoader classLoader,
+Map securityConfigs) {
+super(name, defaultDatabase, bootstrapServers, classLoader,
securityConfigs);
+}
+
+@Override
+public CatalogBaseTable getTable(ObjectPath objectPath)
+throws TableNotExistException, CatalogException {
Review Comment:
Please add the fix of https://github.com/apache/fluss/pull/2152
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3663513712 Yes, sorry not finding the time. Will do this in the next coulpe of days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
loserwang1024 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3663506290 https://github.com/user-attachments/assets/d09dd47d-c796-49ee-be38-dd87c88c227a"; /> @vamossagar12 , please format the code with `mvn spotless:apply`. We can also compile before commit with `mvn clean install -DskipTests` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [flink] Adding changes for flink 2.2 [fluss]
vamossagar12 commented on PR #2167: URL: https://github.com/apache/fluss/pull/2167#issuecomment-3650844915 @leonardBang , I made the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
