[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

2022-05-09 Thread GitBox


alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868882875


##
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UpsertTestSinkWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestSinkWriterITCase {
+
+@Test
+public void testWrite(@TempDir File tempDir) throws Exception {

Review Comment:
   I had the misconception that the test would then share the directory, but 
this is only the case if its declared as static. I'll change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fapaul commented on pull request #19661: [FLINK-27487][kafka] Only forward measurable Kafka metrics and ignore others

2022-05-09 Thread GitBox


fapaul commented on PR #19661:
URL: https://github.com/apache/flink/pull/19661#issuecomment-1122003731

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #199: [FLINK-27551] Update status manually instead of relying on updatecontrol

2022-05-09 Thread GitBox


gyfora opened a new pull request, #199:
URL: https://github.com/apache/flink-kubernetes-operator/pull/199

   This PR reworks how status updates of Flink resources are updated in 
kubernetes. This is necessary due to 
https://github.com/java-operator-sdk/java-operator-sdk/issues/1198
   
   Based on offline discussion with the JOSDK team this seems to be the safest 
short term solution.
   
   The caching of the latest status is necessary to avoid race condition 
between spec updates and status patches that are made by us.
   
   Tests have been updated with some init logic to enable patching them through 
the kubernetes client + removed the reliance of mutating status.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27543) Hide column statistics inside the file format writer.

2022-05-09 Thread Zheng Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zheng Hu updated FLINK-27543:
-
Summary: Hide column statistics inside the file format writer.  (was: 
Introduce StatsProducer to refactor code in DataFileWriter )

> Hide column statistics inside the file format writer.
> -
>
> Key: FLINK-27543
> URL: https://issues.apache.org/jira/browse/FLINK-27543
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Zheng Hu
>Priority: Major
> Fix For: table-store-0.2.0
>
>
> Lots of `fileStatsExtractor == null` looks bad.
> I think we can have a `StatsProducer` to unify `StatsExtractor` and 
> `StatsCollector`. To reduce caller complexity.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27543) Hide column statistics collector inside the file format writer.

2022-05-09 Thread Zheng Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zheng Hu updated FLINK-27543:
-
Summary: Hide column statistics collector inside the file format writer.  
(was: Hide column statistics inside the file format writer.)

> Hide column statistics collector inside the file format writer.
> ---
>
> Key: FLINK-27543
> URL: https://issues.apache.org/jira/browse/FLINK-27543
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Zheng Hu
>Priority: Major
> Fix For: table-store-0.2.0
>
>
> Lots of `fileStatsExtractor == null` looks bad.
> I think we can have a `StatsProducer` to unify `StatsExtractor` and 
> `StatsCollector`. To reduce caller complexity.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator for OnlineLogisticRegression

2022-05-09 Thread GitBox


zhipeng93 commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r868872809


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasL2.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared L2 param. */
+public interface HasL2 extends WithParams {
+Param L_2 = new DoubleParam("l2", "The l2 param.", 0.1, 
ParamValidators.gt(0.0));

Review Comment:
   After some offline discussion, @weibozhao  and I agree that we could reuse 
the exising `HasReg` and `HasElasticNet` param, without introducing `HasL1` and 
`HasL2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator for OnlineLogisticRegression

2022-05-09 Thread GitBox


zhipeng93 commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r868872809


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasL2.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared L2 param. */
+public interface HasL2 extends WithParams {
+Param L_2 = new DoubleParam("l2", "The l2 param.", 0.1, 
ParamValidators.gt(0.0));

Review Comment:
   After some offline discussion, @weibozhao  and I agree that we could reuse 
the exising `HasReg` and `HasElasticNet` param, without introducing `HasL1` and 
`HasL2` in this implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

2022-05-09 Thread GitBox


alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868873968


##
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. 
*/
+@Internal
+public class BinaryFileUtil {
+private static final byte MAGIC_BYTE = 13;
+
+/**
+ * Reads records that were written using the {@link 
BinaryFileUtil#readRecords} method from the
+ * given InputStream.
+ *
+ * @param bis The BufferedInputStream to read from
+ * @return Map containing the read ByteBuffer key-value pairs
+ * @throws IOException
+ */
+public static Map readRecords(BufferedInputStream 
bis)
+throws IOException {
+checkNotNull(bis);
+Map records = new HashMap<>();

Review Comment:
   Where do you see this? The one I found is iterates through the contents 
individual bytes:
   ```
   public int hashCode() {
   int h = 1;
   int p = position();
   for (int i = limit() - 1; i >= p; i--)
   h = 31 * h + (int)get(i);
   return h;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

2022-05-09 Thread GitBox


alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868878917


##
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. 
*/
+@Internal
+public class BinaryFileUtil {

Review Comment:
   I previously wanted to use the SimpleVersionedSerializer and 
SimpleVersionedSerialization. However implementing this interface adds the same 
kind of code we already have in place. Also the SimpleVersionedSerialization 
only takes single byte arrays as input so then we would have to write another 
method around that to allow both key and value as input. And finally it does 
not  provide a simple entrypoint for reading back all elements so I decided 
against that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27557) Create the empty writer for 'ALTER TABLE ... COMPACT'

2022-05-09 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534178#comment-17534178
 ] 

Jingsong Lee commented on FLINK-27557:
--

Can we just use overwrite?
 * overwrite specific manifest entries.
 * overwrite whole table or some whole partition. (eg: when rescale in 
compaction)

> Create the empty writer for 'ALTER TABLE ... COMPACT'
> -
>
> Key: FLINK-27557
> URL: https://issues.apache.org/jira/browse/FLINK-27557
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: table-store-0.2.0
>
>
> Currently, FileStoreWrite only creates an empty writer for the \{{INSERT 
> OVERWRITE}} clause. We should also create the empty writer for manual 
> compaction.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

2022-05-09 Thread GitBox


alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868869814


##
flink-connectors/flink-connector-upsert-test/pom.xml:
##
@@ -0,0 +1,151 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-connectors
+   org.apache.flink
+   1.16-SNAPSHOT
+   
+   4.0.0
+
+   flink-connector-upsert-test
+   Flink : Connectors : Upsert Test
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-connector-base
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-streaming-java
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   flink-table-api-java-bridge
+   ${project.version}
+   provided
+   true
+   
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-connector-test-utils
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-streaming-java
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-table-common
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   
flink-table-planner_${scala.binary.version}
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   test
+   
+
+   
+
+   
+   org.apache.flink
+   flink-architecture-tests-test
+   test
+   
+
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
+   test-jar

Review Comment:
   Oops, missed that on copying over some pom details



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19667: [FLINK-27501][tests] Migrate SerializerTestBase to JUnit5

2022-05-09 Thread GitBox


snuyanzin commented on code in PR #19667:
URL: https://github.com/apache/flink/pull/19667#discussion_r868869212


##
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java:
##
@@ -48,10 +47,7 @@
 import java.util.Objects;
 import java.util.Random;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** A test for the {@link PojoSerializer}. */
 public class PojoSerializerTest extends 
SerializerTestBase {

Review Comment:
   ```suggestion
   class PojoSerializerTest extends 
SerializerTestBase {
   ```
   ?
   \+ there are 3-4 more tests within this class which are public and could be 
made as package-private (GitHub does not allow to comment on them since they 
were not touched)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #116: [FLINK-27558] Introduce a new optional option for TableStoreFactory to represent planned manifest entries

2022-05-09 Thread GitBox


JingsongLi commented on code in PR #116:
URL: https://github.com/apache/flink-table-store/pull/116#discussion_r868862749


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java:
##
@@ -39,6 +39,14 @@ public class TableStoreFactoryOptions {
 + "By default, compaction does not adjust 
the bucket number "
 + "of a partition/table.");
 
+public static final ConfigOption COMPACTION_SCANNED_MANIFEST =

Review Comment:
   Add a internal annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #116: [FLINK-27558] Introduce a new optional option for TableStoreFactory to represent planned manifest entries

2022-05-09 Thread GitBox


JingsongLi commented on code in PR #116:
URL: https://github.com/apache/flink-table-store/pull/116#discussion_r868862596


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java:
##
@@ -39,6 +39,14 @@ public class TableStoreFactoryOptions {
 + "By default, compaction does not adjust 
the bucket number "
 + "of a partition/table.");
 
+public static final ConfigOption COMPACTION_SCANNED_MANIFEST =

Review Comment:
   Can you add format information in description? (how to serialize manifest 
entry to string?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19667: [FLINK-27501][tests] Migrate SerializerTestBase to JUnit5

2022-05-09 Thread GitBox


snuyanzin commented on code in PR #19667:
URL: https://github.com/apache/flink/pull/19667#discussion_r868862351


##
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializerTest.java:
##
@@ -91,6 +91,6 @@ protected String[][] getTestData() {
 @Test
 public void arrayTypeIsMutable() {

Review Comment:
   ```suggestion
   void arrayTypeIsMutable() {
   ```



##
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializerTest.java:
##
@@ -91,6 +91,6 @@ protected String[][] getTestData() {
 @Test
 public void arrayTypeIsMutable() {

Review Comment:
   ```suggestion
   void arrayTypeIsMutable() {
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on pull request #96: [hotfix] Bucketizer split point search

2022-05-09 Thread GitBox


zhipeng93 commented on PR #96:
URL: https://github.com/apache/flink-ml/pull/96#issuecomment-1121980771

   Thanks for the fix @mumuhhh ! LGTM.
   
   Two minor comments:
   - Could you add a unit test that the number of inputCols is not the same as 
number splits?
   - Could you re-polish the commit message, such as `[hotfix] Fix finding 
buckets error in Bucketizer`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27558) Introduce a new optional option for TableStoreFactory to represent planned manifest entries

2022-05-09 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-27558:


Assignee: Jane Chan

> Introduce a new optional option for TableStoreFactory to represent planned 
> manifest entries
> ---
>
> Key: FLINK-27558
> URL: https://issues.apache.org/jira/browse/FLINK-27558
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> When 
> {code:java}
> TableStoreFactory.onCompactTable
> {code}
> gets called, the planned manifest entries need to be injected back into the 
> enriched options, and we need a new key to represent it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] wangyang0918 commented on pull request #19675: [FLINK-27550] Remove checking yarn queues before submitting job to Yarn

2022-05-09 Thread GitBox


wangyang0918 commented on PR #19675:
URL: https://github.com/apache/flink/pull/19675#issuecomment-1121972802

   Maybe we could check the scheduler type first. If it is capacity scheduler, 
`checkYarnQueues` only takes the leaf queue name.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27498) Add E2E tests to cover Flink 1.15

2022-05-09 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534160#comment-17534160
 ] 

Yang Wang commented on FLINK-27498:
---

We could close this ticket since it is already covered by FLINK-27412.

Now the e2e tests are run against with 1.13, 1.14, 1.15.

> Add E2E tests to cover Flink 1.15
> -
>
> Key: FLINK-27498
> URL: https://issues.apache.org/jira/browse/FLINK-27498
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> We should extend our e2e test coverage to test all supported Flink versions, 
> initially that will be 1.14 and 1.15



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27412) Allow flinkVersion v1_13 in flink-kubernetes-operator

2022-05-09 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang closed FLINK-27412.
-
Resolution: Fixed

Fixed via:

main:

6ce3d96937849c70e00ef5f3837bc08ed31d9952

ab1893c61adddb580116186500297cb954f6f6cc

> Allow flinkVersion v1_13 in flink-kubernetes-operator
> -
>
> Key: FLINK-27412
> URL: https://issues.apache.org/jira/browse/FLINK-27412
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Nicholas Jiang
>Priority: Critical
>  Labels: pull-request-available, starter
> Fix For: kubernetes-operator-1.0.0
>
>
> The core k8s related features:
>  * native k8s integration for session cluster, 1.10
>  * native k8s integration for application cluster, 1.11
>  * Flink K8s HA, 1.12
>  * pod template, 1.13
> So we could set required the minimum version to 1.13. This will allow more 
> users to have a try on flink-kubernetes-operator.
>  
> BTW, we need to update the e2e tests to cover all the supported versions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] wangyang0918 merged pull request #197: [FLINK-27412] Allow flinkVersion v1_13 in flink-kubernetes-operator

2022-05-09 Thread GitBox


wangyang0918 merged PR #197:
URL: https://github.com/apache/flink-kubernetes-operator/pull/197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27495) Observer should update last savepoint information directly from cluster too

2022-05-09 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534153#comment-17534153
 ] 

Gyula Fora commented on FLINK-27495:


[~thw] you are completely right. This is specifically intended to cover the 
case when the job goes into a globally terminal state and HA data is cleaned up:

 - Job has been cancelled with a savepoint (but maybe the operator failed 
before recording the final savepoint after the rest call)

 - Job fatally failed for any reason (restart strat exhausted, user throws 
restart supress error etc.)

 - Job Finishes

You are right that in other cases this does not bring any value in terms of 
recording checkpoints so I will probably change the logic to only execute in 
terminal states. 

> Observer should update last savepoint information directly from cluster too
> ---
>
> Key: FLINK-27495
> URL: https://issues.apache.org/jira/browse/FLINK-27495
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>
> The observer should fetch the list checkpoints from the observed job and 
> store the last savepoint into the status directly.
> This is especially useful for terminal job states in Flink 1.15 as it allows 
> us to avoid cornercases such as the operator failing after calling 
> cancel-with-savepoint but before updating the status.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27551) Consider implementing our own status update logic

2022-05-09 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534152#comment-17534152
 ] 

Yang Wang commented on FLINK-27551:
---

I agree with you that we could have our own status update logic. How to deal 
with the {{updateErrorStatus}}?

> Consider implementing our own status update logic
> -
>
> Key: FLINK-27551
> URL: https://issues.apache.org/jira/browse/FLINK-27551
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>
> If a custom resource version is applied while in the middle of a reconcile 
> loop (for the same resource but previous version) the status update will 
> throw an error and re-trigger reconciliation.
> In our case this might be problematic as it would mean we would retry 
> operations that are not necessarily retriable and might require manual user 
> intervention.
> Please see: 
> [https://github.com/java-operator-sdk/java-operator-sdk/issues/1198]
> I think we should consider implementing our own status update logic that is 
> independent of the current resource version to make the flow more robust.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] zuston commented on pull request #19675: [FLINK-27550] Remove checking yarn queues before submitting job to Yarn

2022-05-09 Thread GitBox


zuston commented on PR #19675:
URL: https://github.com/apache/flink/pull/19675#issuecomment-1121953341

   > IIRC, the leaf queue name in capacity scheduler should be unique, which 
means you could use `-Dyarn.application.queue=streaming/batch`. However, for 
fair scheduler, you need to specify a full path queue name. All in all, I do 
not think the `checkYarnQueues` is useless.
   
   However, the full queue name also can be used in capacity scheduler, but the 
method of `checkYarnQueues` can't be compatible with this. @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-23143) Support state migration

2022-05-09 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534149#comment-17534149
 ] 

Hangxiang Yu commented on FLINK-23143:
--

> I like the simplicity of your solution, but I think the concern I expressed 
> above (about creation of multiple state objects) is valid.
The added methods we implement just mock what we do when restore (from rocksdb 
or hashmap). 
They will judge before adding new state. And judging will also works when users 
access.
(We don't implement them by createInternalState)
So I think it's not a problem ?

> Besides that, RegisteredKeyValueStateBackendMetaInfo seems implementation 
> detail of a state backend, so it shouldn't be exposed.
You are right. Maybe we could introduce a new more universal class to wrap 
these information, WDYT?

> Regarding the key serializer upgrade, I think it's [not 
> supported|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/]
>  currently and therefore is out of the scope of this ticket.
I agree. But we haven't implement the logic of compatibility check for key 
serializer as you could see in rocksdb and heap. 
What I describe above about key serializer is the difficulty of implementing 
the logic of compatibility check for key serializer.
It will also works even if we lack it. Do you think it doesn't matter ?

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   Hmm, this is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut` teamed up with 
`numRecordsSend` and `numRecordsSendErrors`. 3. the information of using 
`numRecordsSend` has been described in the first part. From now on, it is 
actually mandatory for connector developers to use all of them except 
`numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for details. Additionally 
since `numRecordsOut` now only counts the records sent between sink tasks and 
`numRecordsOutErrors` was designed for counting the records sent to the 
external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Even with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut` teamed up with 
`numRecordsSend` and `numRecordsSendErrors`. 3. the information of using 
`numRecordsSend` has been described in the first part. From now on, it is 
actually mandatory for connector developers to use all of them except 
`numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for details. Additionally 
since `numRecordsOut` now only counts the records sent between sink tasks and 
`numRecordsOutErrors` was designed for counting the records sent to the 
external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Even with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut` teamed up with 
`numRecordsSend` and `numRecordsSendErrors`. 3. the information of using 
`numRecordsSend` has been described in the first part. From now on, it is 
actually mandatory for connector developers to use them except 
`numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for details. Additionally 
since `numRecordsOut` now only counts the records sent between sink tasks and 
`numRecordsOutErrors` was designed for counting the records sent to the 
external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Even with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27495) Observer should update last savepoint information directly from cluster too

2022-05-09 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534131#comment-17534131
 ] 

Thomas Weise commented on FLINK-27495:
--

[~gyfora] is this for the case of a successfully completed job when the HA 
store was already removed? In other cases when the job isn't healthy the REST 
API may also be unavailable. And if we rely on polling the REST API, we may 
miss the latest checkpoint? Isn't reading from the HA store at the time when 
the path is required the only reliable method?

> Observer should update last savepoint information directly from cluster too
> ---
>
> Key: FLINK-27495
> URL: https://issues.apache.org/jira/browse/FLINK-27495
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>
> The observer should fetch the list checkpoints from the observed job and 
> store the last savepoint into the status directly.
> This is especially useful for terminal job states in Flink 1.15 as it allows 
> us to avoid cornercases such as the operator failing after calling 
> cancel-with-savepoint but before updating the status.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] fredia commented on pull request #19502: [FLINK-27260][Runtime/REST] Expose changelog configurations in Rest API and Web UI

2022-05-09 Thread GitBox


fredia commented on PR #19502:
URL: https://github.com/apache/flink/pull/19502#issuecomment-1121904893

   Sure. I run `SocketWindowWordCount` to  get WEB UI. Here are pictures:
   
   - when changelog enable, display `changelog_storage` and 
`changelog_periodic_materialization_interval` in checkpoint configuration tab:
   ![changgelog 
enable](https://user-images.githubusercontent.com/18653940/167541762-02f681d6-1926-4d31-8f3d-020a180b46d3.png)
   
   - when changelog disable, don't show `changelog_storage` and 
`changelog_periodic_materialization_interval`:
   
![image](https://user-images.githubusercontent.com/18653940/167541862-d4bf6522-e3a6-4947-841b-58cf906cd62a.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-27555) Performance regression in schedulingDownstreamTasks on 02.05.2022

2022-05-09 Thread Guowei Ma (Jira)


[ https://issues.apache.org/jira/browse/FLINK-27555 ]


Guowei Ma deleted comment on FLINK-27555:
---

was (Author: maguowei):
Thanks [~roman] for letting this. I notice that [~zhuzh]  have make some 
changed recently. So would you like to have a look!

> Performance regression in schedulingDownstreamTasks on 02.05.2022
> -
>
> Key: FLINK-27555
> URL: https://issues.apache.org/jira/browse/FLINK-27555
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
> Attachments: Screenshot_2022-05-09_10-33-11.png
>
>
> http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=schedulingDownstreamTasks.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27555) Performance regression in schedulingDownstreamTasks on 02.05.2022

2022-05-09 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534120#comment-17534120
 ] 

Guowei Ma commented on FLINK-27555:
---

Thanks [~roman] for letting this. I notice that [~zhuzh]  have make some 
changed recently. So would you like to have a look!

> Performance regression in schedulingDownstreamTasks on 02.05.2022
> -
>
> Key: FLINK-27555
> URL: https://issues.apache.org/jira/browse/FLINK-27555
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
> Attachments: Screenshot_2022-05-09_10-33-11.png
>
>
> http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=schedulingDownstreamTasks.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27555) Performance regression in schedulingDownstreamTasks on 02.05.2022

2022-05-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534119#comment-17534119
 ] 

Zhu Zhu commented on FLINK-27555:
-

This benchmark result is how much time it takes to do the scheduling. 
Therefore, it is actually an improvement instead of a regression.
The improvement is caused by FLINK-27460 which removed some logics which are no 
longer needed.

I can see it is the description on the left of the dashboard that is causing 
the confusion.
But I'm not sure whether there is a way to change the description for certain 
benchmarks to clearly show that it is "Time cost (less is better)".

> Performance regression in schedulingDownstreamTasks on 02.05.2022
> -
>
> Key: FLINK-27555
> URL: https://issues.apache.org/jira/browse/FLINK-27555
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
> Attachments: Screenshot_2022-05-09_10-33-11.png
>
>
> http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=schedulingDownstreamTasks.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator for OnlineLogisticRegression

2022-05-09 Thread GitBox


weibozhao commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r868785218


##
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java:
##
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import 
org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegression;
+import 
org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegressionModel;
+import org.apache.flink.ml.feature.MinMaxScalerTest;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.StageTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+
+/** Tests {@link OnlineLogisticRegression} and {@link 
OnlineLogisticRegressionModel}. */
+public class OnlineLogisticRegressionTest extends AbstractTestBase {
+@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainDenseTable;
+private static final String LABEL_COL = "label";
+private static final String PREDICT_COL = "prediction";
+private static final String FEATURE_COL = "features";
+private static final String MODEL_VERSION_COL = "modelVersion";
+private static final double[] ONE_ARRAY = new double[] {1.0, 1.0, 1.0};
+private static final List TRAIN_DENSE_ROWS =
+Arrays.asList(
+Row.of(Vectors.dense(0.1, 2.), 0.),
+Row.of(Vectors.dense(0.2, 2.), 0.),
+Row.of(Vectors.dense(0.3, 2.), 0.),
+Row.of(Vectors.dense(0.4, 2.), 0.),
+Row.of(Vectors.dense(0.5, 2.), 0.),
+Row.of(Vectors.dense(11., 12.), 1.),
+Row.of(Vectors.dense(12., 11.), 1.),
+Row.of(Vectors.dense(13., 12.), 1.),
+Row.of(Vectors.dense(14., 12.), 1.),
+Row.of(Vectors.dense(15., 12.), 1.));
+
+private static final List PREDICT_DENSE_ROWS =
+Arrays.asList(
+Row.of(Vectors.dense(0.8, 2.7), 0.),
+Row.of(Vectors.dense(0.8, 2.4), 0.),
+Row.of(Vectors.dense(0.7, 2.3), 0.),
+Row.of(Vectors.dense(0.4, 2.7), 0.),
+Row.of(Vectors.dense(0.5, 2.8), 0.),
+Row.of(Vectors.dense(10.2, 12.1), 1.),
+Row.of(Vectors.dense(13.3, 13.1), 1.),
+Row.of(Vectors.dense(13.5, 12.2), 1.),
+Row.of(Vectors.dense(14.9, 12.5), 1.),
+Row.of(Vectors.dense(15.5, 11.2), 1.));
+
+private sta

[GitHub] [flink] wangyang0918 commented on pull request #19675: [FLINK-27550] Remove checking yarn queues before submitting job to Yarn

2022-05-09 Thread GitBox


wangyang0918 commented on PR #19675:
URL: https://github.com/apache/flink/pull/19675#issuecomment-1121849688

   IIRC, the leaf queue name in capacity scheduler should be unique, which 
means you could use `-Dyarn.application.queue=streaming/batch`. However, for 
fair scheduler, you need to specify a full path queue name. All in all, I do 
not think the `checkYarnQueues` is useless.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868777111


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer 
getSplitSerializer() {
 public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
 return new PendingSplitsCheckpointSerializer(getSplitSerializer());
 }
+
+private void writeObject(ObjectOutputStream out) throws IOException {
+out.defaultWriteObject();
+if (specifiedManifestEntries != null) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+view.writeInt(specifiedManifestEntries.size());
+for (Map.Entry>> 
partEntry :
+specifiedManifestEntries.entrySet()) {
+partSerializer.serialize(partEntry.getKey(), view);
+Map> bucketEntry = 
partEntry.getValue();
+view.writeInt(bucketEntry.size());
+for (Map.Entry> entry : 
bucketEntry.entrySet()) {
+view.writeInt(entry.getKey());
+view.writeInt(entry.getValue().size());
+for (DataFileMeta meta : entry.getValue()) {
+metaSerializer.serialize(meta, view);
+}
+}
+}
+}
+}
+
+private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+in.defaultReadObject();
+if (in.available() > 0) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+DataInputViewStreamWrapper view = new 
DataInputViewStreamWrapper(in);
+specifiedManifestEntries = new HashMap<>();
+int partitionCtr = view.readInt();

Review Comment:
   > What's the meaning of suffix ctr ? partition number or partition count ? 
So why not just use partitionNum ?
   
   `ctr` is abbr. for the counter, stands for the size of the 
`Map>>` .
   (>). Put `ctr` as a suffix is due to this 
value is mutable, while `partitionNum` should be a kind of immutable concept I 
think. Do you feel this is better?
   ```java
   int partitionNum = view.readInt();
   int partitionCtr = partitionNum;
   while(partitionCtr > 0) {
   ...
   partitionCtr--;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #197: [FLINK-27412] Allow flinkVersion v1_13 in flink-kubernetes-operator

2022-05-09 Thread GitBox


wangyang0918 commented on code in PR #197:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/197#discussion_r868774483


##
e2e-tests/data/sessionjob-cr.yaml:
##
@@ -22,8 +22,8 @@ metadata:
   namespace: default
   name: session-cluster-1
 spec:
-  image: flink:1.14.3
-  flinkVersion: v1_14
+  image: flink:1.

Review Comment:
   I got it. It is due to the `sed`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] mumuhhh opened a new pull request, #96: [hotfix] Bucketizer split point search

2022-05-09 Thread GitBox


mumuhhh opened a new pull request, #96:
URL: https://github.com/apache/flink-ml/pull/96

   Using variable repair


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #197: [FLINK-27412] Allow flinkVersion v1_13 in flink-kubernetes-operator

2022-05-09 Thread GitBox


wangyang0918 commented on code in PR #197:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/197#discussion_r868772741


##
e2e-tests/data/sessionjob-cr.yaml:
##
@@ -22,8 +22,8 @@ metadata:
   namespace: default
   name: session-cluster-1
 spec:
-  image: flink:1.14.3
-  flinkVersion: v1_14
+  image: flink:1.

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #197: [FLINK-27412] Allow flinkVersion v1_13 in flink-kubernetes-operator

2022-05-09 Thread GitBox


wangyang0918 commented on code in PR #197:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/197#discussion_r868772648


##
e2e-tests/data/flinkdep-cr.yaml:
##
@@ -22,8 +22,8 @@ metadata:
   namespace: default
   name: flink-example-statemachine
 spec:
-  image: flink:1.14.3
-  flinkVersion: v1_14
+  image: flink:1.
+  flinkVersion: v1_

Review Comment:
   It is strange that the e2e test does not fail with this typo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868771446


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -122,6 +150,17 @@ public SplitEnumerator restoreEnu
 SplitEnumeratorContext context,
 PendingSplitsCheckpoint checkpoint) {
 FileStoreScan scan = fileStore.newScan();
+Long snapshotId;
+Collection splits;
+if (specifiedSnapshotId != null) {
+Preconditions.checkNotNull(
+specifiedManifestEntries,
+"The manifest entries cannot be null for manual 
compaction.");

Review Comment:
   Let me give a detailed explanation about 
   > `ALTER TABLE ... COMPACT` will pre-scan the latest snapshot during the 
planning phase
   
   `ALTER TABLE ... COMPACT` will be first converted to a `SinkModifyOperation` 
with `SourceQueryOperation` as the only child (on the Flink side). And then the 
`ManagedTableFactory#onCompactTable` is invoked, the impl(back to the 
TableStore) will perform a scan, collect manifest entries and the corresponding 
snapshot id, and serialize as a string. It will be put back to enriched 
options. So at the runtime, when the source is initialized, the splits can be 
directly generated from the options. This PR aims to pave the way for skipping 
runtime scans for this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868771446


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -122,6 +150,17 @@ public SplitEnumerator restoreEnu
 SplitEnumeratorContext context,
 PendingSplitsCheckpoint checkpoint) {
 FileStoreScan scan = fileStore.newScan();
+Long snapshotId;
+Collection splits;
+if (specifiedSnapshotId != null) {
+Preconditions.checkNotNull(
+specifiedManifestEntries,
+"The manifest entries cannot be null for manual 
compaction.");

Review Comment:
   Let me give a detailed explanation about 
   > `ALTER TABLE ... COMPACT` will pre-scan the latest snapshot during the 
planning phase
   
   `ALTER TABLE ... COMPACT` will be first converted to a `SinkModifyOperation` 
with `SourceQueryOperation` as the only child (on the Flink side). And then the 
`ManagedTableFactory#onCompactTable` is invoked, the impl(back to the 
TableStore) will perform a scan, collected manifest entries and the 
corresponding snapshot id, and serialized as a string. It will be put back to 
enriched options. So at the runtime, when the source is initialized, the splits 
can be directly generated from the options. This PR aims to pave the way for 
skipping runtime scans for this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27504) State compaction not happening with sliding window and incremental RocksDB backend

2022-05-09 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534105#comment-17534105
 ] 

Yun Tang commented on FLINK-27504:
--

I think you'd better to read RocksDB docs on compaction to know how RocksDB 
trigger compaction at each level: 
https://github.com/facebook/rocksdb/wiki/Leveled-Compaction

The number of levels is not important in your case. As I said above, this is 
more related with the implementation of RocksDB instead of Flink.

For the state size, I think refer to the metrics of 
[total-sst-files-size|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-metrics-total-sst-files-size]
 is much more accurate.

> State compaction not happening with sliding window and incremental RocksDB 
> backend
> --
>
> Key: FLINK-27504
> URL: https://issues.apache.org/jira/browse/FLINK-27504
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.4
> Environment: Local Flink cluster on Arch Linux.
>Reporter: Alexis Sarda-Espinosa
>Priority: Major
> Attachments: duration_trend_52ca77c.png, duration_trend_67c76bb.png, 
> image-2022-05-06-10-34-35-007.png, size_growth_52ca77c.png, 
> size_growth_67c76bb.png
>
>
> Hello,
> I'm trying to estimate an upper bound for RocksDB's state size in my 
> application. For that purpose, I have created a small job with faster timings 
> whose code you can find on GitHub: 
> [https://github.com/asardaes/flink-rocksdb-ttl-test]. You can see some of the 
> results there, but I summarize here as well:
>  * Approximately 20 events per second, 10 unique keys for partitioning are 
> pre-specified.
>  * Sliding window of 11 seconds with a 1-second slide.
>  * Allowed lateness of 11 seconds.
>  * State TTL configured to 1 minute and compaction after 1000 entries.
>  * Both window-specific and window-global state used.
>  * Checkpoints every 2 seconds.
>  * Parallelism of 4 in stateful tasks.
> The goal is to let the job run and analyze state compaction behavior with 
> RocksDB. I should note that global state is cleaned manually inside the 
> functions, TTL for those is in case some keys are no longer seen in the 
> actual production environment.
> I have been running the job on a local cluster (outside IDE), the 
> configuration YAML is also available in the repository. After running for 
> approximately 1.6 days, state size is currently 2.3 GiB (see attachments). I 
> understand state can retain expired data for a while, but since TTL is 1 
> minute, this seems excessive to me.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-web] JingsongLi commented on a diff in pull request #531: Add Table Store 0.1.0 release

2022-05-09 Thread GitBox


JingsongLi commented on code in PR #531:
URL: https://github.com/apache/flink-web/pull/531#discussion_r868769686


##
_posts/2022-05-01-release-table-store-0.1.0.md:
##
@@ -0,0 +1,110 @@
+---
+layout: post
+title:  "Apache Flink Table Store 0.1.0 Release Announcement"
+subtitle: "Unified streaming and batch store for building dynamic tables on 
Apache Flink."
+date: 2022-05-01T08:00:00.000Z
+categories: news
+authors:
+- Jingsong Lee:
+  name: "Jingsong Lee"
+
+---
+
+The Apache Flink community is pleased to announce the preview release of the
+[Apache Flink Table Store](https://github.com/apache/flink-table-store) 
(0.1.0).
+
+Flink Table Store is a unified streaming and batch store for building dynamic 
tables
+on Apache Flink. It uses a full Log-Structured Merge-Tree (LSM) structure for 
high speed
+and a large amount of data update & query capability.
+
+Please check out the full 
[documentation]({{site.DOCS_BASE_URL}}flink-table-store-docs-release-0.1/) for 
detailed information and user guides.
+
+Note: Flink Table Store is still in beta status and undergoing rapid 
development,
+we do not recommend that you use it directly in a production environment.
+
+## What is Flink Table Store
+
+Open [Flink official website](https://flink.apache.org/), you can see the 
following line:
+`Apache Flink - Stateful Computations over Data Streams.` Flink focuses on 
distributed computing,
+which brings real-time big data computing. Users need to combine Flink with 
some kind of external storage.
+
+The message queue will be used in both source & intermediate stages in 
streaming pipeline, to guarantee the
+latency stay within seconds. There will also be a real-time OLAP system 
receiving processed data in streaming
+fashion and serving user’s ad-hoc queries.
+
+Everything works fine as long as users only care about the aggregated results. 
But when users start to care
+about the intermediate data, they will immediately hit a blocker: Intermediate 
kafka tables are not queryable.
+
+Therefore, users use multiple systems. Writing to a lake store like Apache 
Hudi, Apache Iceberg while writing to Queue,
+the lake store keeps historical data at a lower cost.
+
+There are two main issues with doing this:
+- High understanding bar for users: It’s also not easy for users to understand 
all the SQL connectors,
+  learn the capabilities and restrictions for each of those. Users may also 
want to play around with
+  streaming & batch unification, but don't really know how, given the 
connectors are most of the time different
+  in batch and streaming use cases.
+- Increasing architecture complexity: It’s hard to choose the most suited 
external systems when the requirements
+  include streaming pipelines, offline batch jobs, ad-hoc queries. Multiple 
systems will increase the operation
+  and maintenance complexity. Users at least need to coordinate between the 
queue system and file system of each
+  table, which is error-prone.
+
+The Flink Table Store aims to provide a unified storage abstraction:
+- Table Store provides storage of historical data while providing queue 
abstraction.
+- Table Store provides competitive historical storage with lake storage 
capability, using LSM file structure
+  to store data on DFS, providing real-time updates and queries at a lower 
cost.

Review Comment:
   > I think the object storage is also another kind of DFS...
   
   Yes, I think too...  we can also add more description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-27559) Some question about flink operator state

2022-05-09 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-27559.

Resolution: Information Provided

> Some question about flink operator state
> 
>
> Key: FLINK-27559
> URL: https://issues.apache.org/jira/browse/FLINK-27559
> Project: Flink
>  Issue Type: New Feature
> Environment: Flink 1.14.4
>Reporter: Underwood
>Priority: Major
>
> I hope to get two answers to Flink's maintenance status:
>  
> 1. Does custompartition support saving status? In my usage scenario, the 
> partition strategy is dynamically adjusted, which depends on the data in 
> datastream. I hope to make different partition strategies according to 
> different data conditions.
>  
> For a simple example, I want the first 100 pieces of data in datastream to be 
> range partitioned and the rest of the data to be hash partitioned. At this 
> time, I may need a count to identify the number of pieces of data that have 
> been processed. However, in custompartition, this is only a local variable, 
> so there seem to be two problems: declaring variables in this way can only be 
> used in single concurrency, and it seems that they cannot be counted across 
> slots; In this way, the count data will be lost during fault recovery.
>  
> Although Flink already has operator state and key value state, 
> custompartition is not an operator, so I don't think it can solve this 
> problem through state. I've considered introducing a zookeeper to save the 
> state, but the introduction of new components will make the system bloated. I 
> don't know whether there is a better way to solve this problem.
>  
> 2. How to make multiple operators share the same state, and even all parallel 
> subtasks of different operators share the same state?
>  
> For a simple example, my stream processing is divided into four stages: 
> source - > mapa - > mapb - > sink. I hope to have a status count to count the 
> total amount of data processed by all operators. For example, if the source 
> receives one piece of data, then count + 1 when mapa is processed and count + 
> 1 when mapb is processed. Finally, after this piece of data is processed, the 
> value of count is 2.
>  
> I don't know if there is such a state saving mechanism in Flink, which can 
> meet my scenario and recover from failure at the same time. At present, we 
> can still think of using zookeeper. I don't know if there is a better way.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27559) Some question about flink operator state

2022-05-09 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534103#comment-17534103
 ] 

Yun Tang commented on FLINK-27559:
--

[~Underwood] Flink's JIRA is not a place to ask questions, please ask these 
questions in the mailing list: 
https://flink.apache.org/community.html#mailing-lists

> Some question about flink operator state
> 
>
> Key: FLINK-27559
> URL: https://issues.apache.org/jira/browse/FLINK-27559
> Project: Flink
>  Issue Type: New Feature
> Environment: Flink 1.14.4
>Reporter: Underwood
>Priority: Major
>
> I hope to get two answers to Flink's maintenance status:
>  
> 1. Does custompartition support saving status? In my usage scenario, the 
> partition strategy is dynamically adjusted, which depends on the data in 
> datastream. I hope to make different partition strategies according to 
> different data conditions.
>  
> For a simple example, I want the first 100 pieces of data in datastream to be 
> range partitioned and the rest of the data to be hash partitioned. At this 
> time, I may need a count to identify the number of pieces of data that have 
> been processed. However, in custompartition, this is only a local variable, 
> so there seem to be two problems: declaring variables in this way can only be 
> used in single concurrency, and it seems that they cannot be counted across 
> slots; In this way, the count data will be lost during fault recovery.
>  
> Although Flink already has operator state and key value state, 
> custompartition is not an operator, so I don't think it can solve this 
> problem through state. I've considered introducing a zookeeper to save the 
> state, but the introduction of new components will make the system bloated. I 
> don't know whether there is a better way to solve this problem.
>  
> 2. How to make multiple operators share the same state, and even all parallel 
> subtasks of different operators share the same state?
>  
> For a simple example, my stream processing is divided into four stages: 
> source - > mapa - > mapb - > sink. I hope to have a status count to count the 
> total amount of data processed by all operators. For example, if the source 
> receives one piece of data, then count + 1 when mapa is processed and count + 
> 1 when mapb is processed. Finally, after this piece of data is processed, the 
> value of count is 2.
>  
> I don't know if there is such a state saving mechanism in Flink, which can 
> meet my scenario and recover from failure at the same time. At present, we 
> can still think of using zookeeper. I don't know if there is a better way.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27560) Refactor SimpleStateRequestHandler for PyFlink state

2022-05-09 Thread Juntao Hu (Jira)
Juntao Hu created FLINK-27560:
-

 Summary: Refactor SimpleStateRequestHandler for PyFlink state
 Key: FLINK-27560
 URL: https://issues.apache.org/jira/browse/FLINK-27560
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Juntao Hu
 Fix For: 1.16.0


Currently SimpleStateRequestHandler.java for handling beam state request from 
python side is coupled with keyed-state logic, which could be refactored to 
reduce code duplication when implementing operator state (list/broadcast state 
can be fit into bag/map logic).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767988


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer 
getSplitSerializer() {
 public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
 return new PendingSplitsCheckpointSerializer(getSplitSerializer());
 }
+
+private void writeObject(ObjectOutputStream out) throws IOException {
+out.defaultWriteObject();
+if (specifiedManifestEntries != null) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+view.writeInt(specifiedManifestEntries.size());
+for (Map.Entry>> 
partEntry :
+specifiedManifestEntries.entrySet()) {
+partSerializer.serialize(partEntry.getKey(), view);
+Map> bucketEntry = 
partEntry.getValue();
+view.writeInt(bucketEntry.size());
+for (Map.Entry> entry : 
bucketEntry.entrySet()) {
+view.writeInt(entry.getKey());
+view.writeInt(entry.getValue().size());
+for (DataFileMeta meta : entry.getValue()) {
+metaSerializer.serialize(meta, view);
+}
+}
+}
+}
+}
+
+private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+in.defaultReadObject();
+if (in.available() > 0) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+DataInputViewStreamWrapper view = new 
DataInputViewStreamWrapper(in);
+specifiedManifestEntries = new HashMap<>();
+int partitionCtr = view.readInt();
+while (partitionCtr > 0) {
+BinaryRowData partition = partSerializer.deserialize(view);
+Map> bucketEntry = new HashMap<>();
+int bucketCtr = view.readInt();
+while (bucketCtr > 0) {
+int bucket = view.readInt();
+int entryCtr = view.readInt();
+if (entryCtr == 0) {
+bucketEntry.put(bucket, Collections.emptyList());
+} else {
+List metas = new ArrayList<>();
+while (entryCtr > 0) {
+metas.add(metaSerializer.deserialize(view));
+entryCtr--;
+}
+bucketEntry.put(bucket, metas);
+}
+bucketCtr--;
+}
+specifiedManifestEntries.put(partition, bucketEntry);
+partitionCtr--;
+}
+}
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (!(o instanceof FileStoreSource)) {
+return false;
+}
+FileStoreSource that = (FileStoreSource) o;
+return valueCountMode == that.valueCountMode
+&& isContinuous == that.isContinuous
+&& discoveryInterval == that.discoveryInterval
+&& latestContinuous == that.latestContinuous
+&& fileStore.equals(that.fileStore)
+&& Arrays.equals(projectedFields, that.projectedFields)

Review Comment:
   Thanks for the reporting. This code snippet is auto-generated by Intellij. I 
should check more carefully. Blame on me, and I'll fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on pull request #19502: [FLINK-27260][Runtime/REST] Expose changelog configurations in Rest API and Web UI

2022-05-09 Thread GitBox


Myasuka commented on PR #19502:
URL: https://github.com/apache/flink/pull/19502#issuecomment-1121834299

   @fredia would you please share the picture that how the UI looks like with 
this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767378


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -69,7 +92,10 @@ public FileStoreSource(
 boolean latestContinuous,
 @Nullable int[][] projectedFields,
 @Nullable Predicate partitionPredicate,
-@Nullable Predicate fieldPredicate) {
+@Nullable Predicate fieldPredicate,
+@Nullable Long specifiedSnapshotId,
+@Nullable
+Map>> 
specifiedManifestEntries) {

Review Comment:
   > What's the case that we will set those two fields with non-null values ? I 
see all of them will be set to be `null` in the `buildFileStore`.
   
   For `ALTER TABLE ... COMPACT`, the `buildFileStore` will be modified in the 
following PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767106


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -122,6 +150,17 @@ public SplitEnumerator restoreEnu
 SplitEnumeratorContext context,
 PendingSplitsCheckpoint checkpoint) {
 FileStoreScan scan = fileStore.newScan();
+Long snapshotId;
+Collection splits;
+if (specifiedSnapshotId != null) {
+Preconditions.checkNotNull(
+specifiedManifestEntries,
+"The manifest entries cannot be null for manual 
compaction.");

Review Comment:
   > The error message looks quite strange for me because I'm just curious : Is 
it the only case that we will set `specifiedSnapshotId` and 
`specifiedManifestEntries` for manual compaction ? Will be other case that we 
will set the two fields ?
   
   So far, only manually invoked compaction (triggered by `ALTER TABLE ... 
COMPACT`) will specify the snapshot id and manifest entries. Other conditions 
will perform a scan during the runtime.
   
   The reason is that `ALTER TABLE ... COMPACT` will pre-scan the latest 
snapshot during the planning phase, serialize the info as a string, and put it 
back to enriched options. Therefore, at the runtime, the source does not need 
to perform a scan anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] schumiyi commented on pull request #18914: [FLINK-26259][table-planner]Partial insert and partition insert canno…

2022-05-09 Thread GitBox


schumiyi commented on PR #18914:
URL: https://github.com/apache/flink/pull/18914#issuecomment-1121831473

   Appreciate if you can help take a look. @godfreyhe @wuchong @twalthr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27559) Some question about flink operator state

2022-05-09 Thread Underwood (Jira)
Underwood created FLINK-27559:
-

 Summary: Some question about flink operator state
 Key: FLINK-27559
 URL: https://issues.apache.org/jira/browse/FLINK-27559
 Project: Flink
  Issue Type: New Feature
 Environment: Flink 1.14.4
Reporter: Underwood


I hope to get two answers to Flink's maintenance status:

 

1. Does custompartition support saving status? In my usage scenario, the 
partition strategy is dynamically adjusted, which depends on the data in 
datastream. I hope to make different partition strategies according to 
different data conditions.

 

For a simple example, I want the first 100 pieces of data in datastream to be 
range partitioned and the rest of the data to be hash partitioned. At this 
time, I may need a count to identify the number of pieces of data that have 
been processed. However, in custompartition, this is only a local variable, so 
there seem to be two problems: declaring variables in this way can only be used 
in single concurrency, and it seems that they cannot be counted across slots; 
In this way, the count data will be lost during fault recovery.

 

Although Flink already has operator state and key value state, custompartition 
is not an operator, so I don't think it can solve this problem through state. 
I've considered introducing a zookeeper to save the state, but the introduction 
of new components will make the system bloated. I don't know whether there is a 
better way to solve this problem.

 

2. How to make multiple operators share the same state, and even all parallel 
subtasks of different operators share the same state?

 

For a simple example, my stream processing is divided into four stages: source 
- > mapa - > mapb - > sink. I hope to have a status count to count the total 
amount of data processed by all operators. For example, if the source receives 
one piece of data, then count + 1 when mapa is processed and count + 1 when 
mapb is processed. Finally, after this piece of data is processed, the value of 
count is 2.

 

I don't know if there is such a state saving mechanism in Flink, which can meet 
my scenario and recover from failure at the same time. At present, we can still 
think of using zookeeper. I don't know if there is a better way.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] luoyuxia commented on a diff in pull request #19656: [FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect

2022-05-09 Thread GitBox


luoyuxia commented on code in PR #19656:
URL: https://github.com/apache/flink/pull/19656#discussion_r868761997


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java:
##
@@ -807,6 +807,29 @@ public void testShowPartitions() throws Exception {
 assertTrue(partitions.toString().contains("dt=2020-04-30 
01:02:03/country=china"));
 }
 
+@Test
+public void testStatementVariableSubstitution() {
+// test system variable for substitution
+System.setProperty("k1", "v1");
+List result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select 
'${system:k1}'").collect());
+assertEquals("[+I[v1]]", result.toString());
+
+// test env variable for substitution
+String classPath = System.getenv("CLASSPATH");
+result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select 
'${env:CLASSPATH}'").collect());
+assertEquals(String.format("[+I[%s]]", classPath), result.toString());
+
+// test hive conf variable for substitution
+result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select 
'${hiveconf:common-key}'").collect());

Review Comment:
   Currently, we have no way to dynamically set hiveconf variables in the flink 
table api.  AFAK, hive only supports set hiveconf variables in cli or init a 
hive session with hiveconf variables. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

2022-05-09 Thread GitBox


openinx commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868754349


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -69,7 +92,10 @@ public FileStoreSource(
 boolean latestContinuous,
 @Nullable int[][] projectedFields,
 @Nullable Predicate partitionPredicate,
-@Nullable Predicate fieldPredicate) {
+@Nullable Predicate fieldPredicate,
+@Nullable Long specifiedSnapshotId,
+@Nullable
+Map>> 
specifiedManifestEntries) {

Review Comment:
   What's the case that we will set those two fields with non-null values ?  I 
see all of them will be set to be `null` in the `buildFileStore`.



##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -122,6 +150,17 @@ public SplitEnumerator restoreEnu
 SplitEnumeratorContext context,
 PendingSplitsCheckpoint checkpoint) {
 FileStoreScan scan = fileStore.newScan();
+Long snapshotId;
+Collection splits;
+if (specifiedSnapshotId != null) {
+Preconditions.checkNotNull(
+specifiedManifestEntries,
+"The manifest entries cannot be null for manual 
compaction.");

Review Comment:
   The error message looks quite strange for me because I'm just curious :  Is 
it the only case that we will set `specifiedSnapshotId` and 
`specifiedManifestEntries` for manual compaction ?  Will be other case that we 
will set the two fields ? 



##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer 
getSplitSerializer() {
 public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
 return new PendingSplitsCheckpointSerializer(getSplitSerializer());
 }
+
+private void writeObject(ObjectOutputStream out) throws IOException {
+out.defaultWriteObject();
+if (specifiedManifestEntries != null) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+view.writeInt(specifiedManifestEntries.size());
+for (Map.Entry>> 
partEntry :
+specifiedManifestEntries.entrySet()) {
+partSerializer.serialize(partEntry.getKey(), view);
+Map> bucketEntry = 
partEntry.getValue();
+view.writeInt(bucketEntry.size());
+for (Map.Entry> entry : 
bucketEntry.entrySet()) {
+view.writeInt(entry.getKey());
+view.writeInt(entry.getValue().size());
+for (DataFileMeta meta : entry.getValue()) {
+metaSerializer.serialize(meta, view);
+}
+}
+}
+}
+}
+
+private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+in.defaultReadObject();
+if (in.available() > 0) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+DataInputViewStreamWrapper view = new 
DataInputViewStreamWrapper(in);
+specifiedManifestEntries = new HashMap<>();
+int partitionCtr = view.readInt();

Review Comment:
   What's the meaning of suffix `ctr`  ?  partition number or partition count ? 
So why not just use partitionNum ?



##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer 
getSplitSerializer() {
 public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
 return new PendingSplitsCheckpointSerializer(getSplitSerializer());
 }
+
+private void writeObject(ObjectOutputStream out) throws IOException {
+out.defaultWriteObject();
+if (specifiedManifestEntries != null) {
+BinaryRowDataSerializer partSerializer =
+new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+DataFileMetaSerializer metaSerializer =
+new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+ 

[GitHub] [flink-ml] yunfengzhou-hub commented on pull request #95: Migrated to flink 1.15.0.

2022-05-09 Thread GitBox


yunfengzhou-hub commented on PR #95:
URL: https://github.com/apache/flink-ml/pull/95#issuecomment-1121813951

   Hi @dotbg , thanks for contributing to Flink ML.
   
   I tried running `mvn clean install` on this PR, and it seems that there are 
still errors during compilation. Could you please fix them and push to this PR 
again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26588) Translate the new CAST documentation to Chinese

2022-05-09 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-26588:
---
Fix Version/s: 1.15.1

> Translate the new CAST documentation to Chinese
> ---
>
> Key: FLINK-26588
> URL: https://issues.apache.org/jira/browse/FLINK-26588
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.15.0
>Reporter: Martijn Visser
>Assignee: Chengkai Yang
>Priority: Major
>  Labels: chinese-translation, pull-request-available, 
> stale-assigned
> Fix For: 1.15.1
>
>
> Since FLINK-26125 is now merged, this content change should also be 
> translated to Chinese. Relevant PR is 
> https://github.com/apache/flink/pull/18813



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26588) Translate the new CAST documentation to Chinese

2022-05-09 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534087#comment-17534087
 ] 

Leonard Xu commented on FLINK-26588:


Fixed in 
 master: a9605906aa0a4c9157717b7221ef38eeb70f5525
 release-1.15: TODO   

> Translate the new CAST documentation to Chinese
> ---
>
> Key: FLINK-26588
> URL: https://issues.apache.org/jira/browse/FLINK-26588
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.15.0
>Reporter: Martijn Visser
>Assignee: Chengkai Yang
>Priority: Major
>  Labels: chinese-translation, pull-request-available, 
> stale-assigned
> Fix For: 1.15.1
>
>
> Since FLINK-26125 is now merged, this content change should also be 
> translated to Chinese. Relevant PR is 
> https://github.com/apache/flink/pull/18813



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] leonardBang commented on pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese

2022-05-09 Thread GitBox


leonardBang commented on PR #19498:
URL: https://github.com/apache/flink/pull/19498#issuecomment-1121808405

   @ChengkaiYang2022 Could you also open a backport PR for branch 
`release-1.15` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] leonardBang merged pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese

2022-05-09 Thread GitBox


leonardBang merged PR #19498:
URL: https://github.com/apache/flink/pull/19498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zuston commented on pull request #19675: [FLINK-27550] Remove checking yarn queues before submitting job to Yarn

2022-05-09 Thread GitBox


zuston commented on PR #19675:
URL: https://github.com/apache/flink/pull/19675#issuecomment-1121805622

   @wangyang0918 Thanks for your quick reply.
   
   Yes, you are right.  If using the two levels queue naming policy, the leaf 
queue name(`-Dyarn.application.queue=default`) or full 
queue(`-Dyarn.application.queue=root.default`) name can work in 
capacity-scheduler when checking the queues.
   
   But when using three levels naming policy, like `root.job.streaming / 
root.job.batch`, it will be invalid. When i remote debug with our internal job, 
i found that the QueueInfo.getQueuePath will only return the simple queue name.
   
   So let's see the example in the following yarn queue configuration.
   
   > Yarn Queue:
   > 1. root.job.streaming
   > 2. root.job.batch
   > 
   > When retrieving all queues by using YarnClient.getAllQueues, it will 
return the list of QueueInfo. And the returning list of QueueInfo.getQueuePath 
is [root, job, streaming, batch]. 
   > 
   > 
   
   So when the flink job queue is specified as root.job.streaming and in the 
method of  `checkYarnQueues` , it will not find the corresponding the queue in 
the existing above Yarn queues and will print out the incorrect message.
   
   And why the queue can work in two level naming policy? I think it just add 
the extra special handler to avoid misusing the api of getQueuePath in the 
below code.
   ```
   for (QueueInfo queue : queues) {
   if (queue.getQueueName().equals(this.yarnQueue)
   // 特殊处理:queue.getQueueName().equals("root." + 
this.yarnQueue)
   || queue.getQueueName().equals("root." + 
this.yarnQueue)) {
   queueFound = true;
   break;
   }
   }
   ```
   
   And returning to this issue, to solve above problem, we have to use the api 
of `getQueuePath`, but it's introduced in the Hadoop latest version.
   
   So i think more about why introducing this method of `checkYarnQueues`? Just 
to give user a tip and will not exit directly when the queue dont exist in the 
cluster yarn queues? If only so, i  think there is no need to use this method, 
the error message will be shown in the Flink Yarn application master diagnostic 
message.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

2022-05-09 Thread GitBox


openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r868744983


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##
@@ -82,18 +95,41 @@ public FileStoreWriteImpl(
 public RecordWriter createWriter(
 BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
 Long latestSnapshotId = pathFactory.latestSnapshotId();
-if (latestSnapshotId == null) {
-return createEmptyWriter(partition, bucket, compactExecutor);
-} else {
-return createMergeTreeWriter(
-partition,
-bucket,
-scan.withSnapshot(latestSnapshotId)
-
.withPartitionFilter(Collections.singletonList(partition))
-.withBucket(bucket).plan().files().stream()
-.map(ManifestEntry::file)
-.collect(Collectors.toList()),
-compactExecutor);
+List existingFileMetas = Lists.newArrayList();
+if (latestSnapshotId != null) {
+// Concat all the DataFileMeta of existing files into 
existingFileMetas.
+scan.withSnapshot(latestSnapshotId)
+
.withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
+.plan().files().stream()
+.map(ManifestEntry::file)
+.forEach(existingFileMetas::add);
+}
+
+switch (writeMode) {
+case APPEND_ONLY:
+DataFilePathFactory factory =
+pathFactory.createDataFilePathFactory(partition, 
bucket);
+FileStatsExtractor fileStatsExtractor =
+
fileFormat.createStatsExtractor(writeSchema).orElse(null);
+
+return new AppendOnlyWriter(
+fileFormat,
+options.targetFileSize,
+writeSchema,
+existingFileMetas,
+factory,
+fileStatsExtractor);
+
+case CHANGE_LOG:
+if (latestSnapshotId == null) {
+return createEmptyWriter(partition, bucket, 
compactExecutor);
+} else {
+return createMergeTreeWriter(
+partition, bucket, existingFileMetas, 
compactExecutor);
+}

Review Comment:
   Make sense for me !



##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##
@@ -34,30 +36,38 @@
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 /** Default implementation of {@link FileStoreWrite}. */
 public class FileStoreWriteImpl implements FileStoreWrite {
 
 private final DataFileReader.Factory dataFileReaderFactory;
 private final DataFileWriter.Factory dataFileWriterFactory;
+private final WriteMode writeMode;
+private final FileFormat fileFormat;
+private final RowType writeSchema;
 private final Supplier> keyComparatorSupplier;
 private final MergeFunction mergeFunction;
 private final FileStorePathFactory pathFactory;
 private final FileStoreScan scan;
 private final MergeTreeOptions options;
 
 public FileStoreWriteImpl(
+WriteMode writeMode,
+RowType writeSchema,
 RowType keyType,
 RowType valueType,

Review Comment:
   The `writeSchema` can be removed now.  In the previous version, I was 
thinking that valueType would be some customized table schema (instead of the 
original table schema), such as `int _VALUE_COUNT` or some other type. But in 
fact, for append-only table the `valueType` will always be `tableSchema`.  So 
we don't have an extra `writeSchema` any more. Thanks for the comment.



##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##
@@ -0,0 +1,176 @@
+/*
+ * License

[GitHub] [flink-table-store] openinx commented on a diff in pull request #115: [FLINK-27546] Add append only writer which implements the RecordWriter interface

2022-05-09 Thread GitBox


openinx commented on code in PR #115:
URL: https://github.com/apache/flink-table-store/pull/115#discussion_r868723547


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/**
+ * A {@link RecordWriter} implementation that only accepts records which are 
always insert
+ * operations and don't have any unique keys or sort keys.
+ */
+public class AppendOnlyWriter implements RecordWriter {
+private final BulkWriter.Factory writerFactory;
+private final RowType writeSchema;
+private final long targetFileSize;
+private final DataFilePathFactory pathFactory;
+private final FileStatsExtractor fileStatsExtractor;
+
+private final AtomicLong nextSeqNum;
+
+private RowRollingWriter writer;
+
+public AppendOnlyWriter(
+FileFormat fileFormat,
+long targetFileSize,
+RowType writeSchema,
+long maxWroteSeqNumber,
+DataFilePathFactory pathFactory) {
+
+this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+this.writeSchema = writeSchema;
+this.targetFileSize = targetFileSize;
+
+this.pathFactory = pathFactory;
+this.fileStatsExtractor = 
fileFormat.createStatsExtractor(writeSchema).orElse(null);
+
+this.nextSeqNum = new AtomicLong(maxWroteSeqNumber + 1);
+
+this.writer = createRollingRowWriter();
+}
+
+@Override
+public void write(ValueKind valueKind, RowData key, RowData value) throws 
Exception {
+Preconditions.checkArgument(
+valueKind == ValueKind.ADD,
+"Append-only writer cannot accept ValueKind: " + valueKind);
+
+writer.write(value);
+}
+
+@Override
+public Increment prepareCommit() throws Exception {
+List newFiles = Lists.newArrayList();
+
+if (writer != null) {
+writer.close();
+
+// Reopen the writer to accept further records.
+newFiles.addAll(writer.result());
+writer = createRollingRowWriter();
+}
+
+return new Increment(Lists.newArrayList(newFiles));
+}
+
+@Override
+public void sync() throws Exception {
+// Do nothing here, as this writer don't introduce any async 
compaction thread currently.
+}
+
+@Override
+public List close() throws Exception {
+sync();
+
+List result = Lists.newArrayList();
+if (writer != null) {
+// Abort this writer to clear uncommitted files.
+writer.abort();

Review Comment:
   Filed a separate issue for this: 
https://issues.apache.org/jira/browse/FLINK-27553



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #115: [FLINK-27546] Add append only writer which implements the RecordWriter interface

2022-05-09 Thread GitBox


openinx commented on code in PR #115:
URL: https://github.com/apache/flink-table-store/pull/115#discussion_r868723542


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/**
+ * A {@link RecordWriter} implementation that only accepts records which are 
always insert
+ * operations and don't have any unique keys or sort keys.
+ */
+public class AppendOnlyWriter implements RecordWriter {
+private final BulkWriter.Factory writerFactory;
+private final RowType writeSchema;
+private final long targetFileSize;
+private final DataFilePathFactory pathFactory;
+private final FileStatsExtractor fileStatsExtractor;
+
+private final AtomicLong nextSeqNum;
+
+private RowRollingWriter writer;
+
+public AppendOnlyWriter(
+FileFormat fileFormat,
+long targetFileSize,
+RowType writeSchema,
+long maxWroteSeqNumber,
+DataFilePathFactory pathFactory) {
+
+this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+this.writeSchema = writeSchema;
+this.targetFileSize = targetFileSize;
+
+this.pathFactory = pathFactory;
+this.fileStatsExtractor = 
fileFormat.createStatsExtractor(writeSchema).orElse(null);
+
+this.nextSeqNum = new AtomicLong(maxWroteSeqNumber + 1);
+
+this.writer = createRollingRowWriter();
+}
+
+@Override
+public void write(ValueKind valueKind, RowData key, RowData value) throws 
Exception {
+Preconditions.checkArgument(
+valueKind == ValueKind.ADD,
+"Append-only writer cannot accept ValueKind: " + valueKind);
+
+writer.write(value);
+}
+
+@Override
+public Increment prepareCommit() throws Exception {
+List newFiles = Lists.newArrayList();
+
+if (writer != null) {
+writer.close();
+
+// Reopen the writer to accept further records.
+newFiles.addAll(writer.result());
+writer = createRollingRowWriter();
+}
+
+return new Increment(Lists.newArrayList(newFiles));
+}
+
+@Override
+public void sync() throws Exception {
+// Do nothing here, as this writer don't introduce any async 
compaction thread currently.
+}
+
+@Override
+public List close() throws Exception {
+sync();
+
+List result = Lists.newArrayList();
+if (writer != null) {
+// Abort this writer to clear uncommitted files.
+writer.abort();

Review Comment:
   Filed a separate issue for this: 
https://issues.apache.org/jira/browse/FLINK-27553



##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF license

[jira] [Assigned] (FLINK-27297) Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method in PyFlink

2022-05-09 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-27297:
---

Assignee: LuNng Wang

> Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) 
> method in PyFlink
> ---
>
> Key: FLINK-27297
> URL: https://issues.apache.org/jira/browse/FLINK-27297
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: LuNng Wang
>Priority: Major
> Fix For: 1.16.0
>
>
> StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method has 
> been added in Java side since release-1.12, we need to add this method in 
> Python too



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27297) Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method in PyFlink

2022-05-09 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534071#comment-17534071
 ] 

Dian Fu commented on FLINK-27297:
-

[~ana4] Thanks for taking this. Have assigned it to you~

> Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) 
> method in PyFlink
> ---
>
> Key: FLINK-27297
> URL: https://issues.apache.org/jira/browse/FLINK-27297
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: LuNng Wang
>Priority: Major
> Fix For: 1.16.0
>
>
> StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method has 
> been added in Java side since release-1.12, we need to add this method in 
> Python too



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27297) Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method in PyFlink

2022-05-09 Thread LuNng Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534070#comment-17534070
 ] 

LuNng Wang commented on FLINK-27297:


[~dianfu] [~hxbks2ks] I would like to try this.

> Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) 
> method in PyFlink
> ---
>
> Key: FLINK-27297
> URL: https://issues.apache.org/jira/browse/FLINK-27297
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
> Fix For: 1.16.0
>
>
> StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method has 
> been added in Java side since release-1.12, we need to add this method in 
> Python too



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] hililiwei commented on pull request #17798: [hotfix] Fix comment typos in AsyncIOExample

2022-05-09 Thread GitBox


hililiwei commented on PR #17798:
URL: https://github.com/apache/flink/pull/17798#issuecomment-1121765967

   > @hililiwei You haven't updated the commit message, you have updated the PR 
title
   
   updated. thx. 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27558) Introduce a new optional option for TableStoreFactory to represent planned manifest entries

2022-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27558:
---
Labels: pull-request-available  (was: )

> Introduce a new optional option for TableStoreFactory to represent planned 
> manifest entries
> ---
>
> Key: FLINK-27558
> URL: https://issues.apache.org/jira/browse/FLINK-27558
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> When 
> {code:java}
> TableStoreFactory.onCompactTable
> {code}
> gets called, the planned manifest entries need to be injected back into the 
> enriched options, and we need a new key to represent it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] LadyForest opened a new pull request, #116: [FLINK-27558] Introduce a new optional option for TableStoreFactory to represent planned manifest entries

2022-05-09 Thread GitBox


LadyForest opened a new pull request, #116:
URL: https://github.com/apache/flink-table-store/pull/116

   When `TableStoreFactory.onCompactTable`gets called, the planned manifest 
entries need to be injected back into the enriched options, and we need a new 
key to represent it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] weibozhao commented on a diff in pull request #86: [FLINK-27294] Add Transformer for BinaryClassificationEvaluator

2022-05-09 Thread GitBox


weibozhao commented on code in PR #86:
URL: https://github.com/apache/flink-ml/pull/86#discussion_r868690036


##
flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/binaryeval/BinaryClassificationEvaluatorTest.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation.binaryeval;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.StageTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link BinaryClassificationEvaluator}. */
+public class BinaryClassificationEvaluatorTest {

Review Comment:
   OK, I will add these test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ChengkaiYang2022 commented on pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese

2022-05-09 Thread GitBox


ChengkaiYang2022 commented on PR #19498:
URL: https://github.com/apache/flink/pull/19498#issuecomment-1121710303

   CI test 
passed.[link](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35507&view=results)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] haizhou-zhao commented on a diff in pull request #19645: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema

2022-05-09 Thread GitBox


haizhou-zhao commented on code in PR #19645:
URL: https://github.com/apache/flink/pull/19645#discussion_r868618419


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java:
##
@@ -104,10 +105,15 @@ public boolean canEqual(Object obj) {
 }
 
 private void writeObject(ObjectOutputStream oos) throws IOException {
-oos.writeUTF(schema.toString());
+byte[] schemaStrInBytes = 
schema.toString(false).getBytes(StandardCharsets.UTF_8);

Review Comment:
   @tweise @stevenzwu I think the avro serializer snapshot (which is used for 
de/ser operator state if the state include Avro record) already takes care of 
de/ser of large Avro schema: Code 
[link](https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java#L155).
   
   Since the serialization of avro serializer and typeinfo is only used when 
constructing edge and vertices of stream graph, maybe we don't need to take 
care of backward compatibility here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-23633) HybridSource: Support dynamic stop position in FileSource

2022-05-09 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534050#comment-17534050
 ] 

Thomas Weise commented on FLINK-23633:
--

Hi [~binh] , of course. Perhaps you can start with a short description how you 
would like to go about it? Do you want me to assign this ticket to you?

> HybridSource: Support dynamic stop position in FileSource
> -
>
> Key: FLINK-23633
> URL: https://issues.apache.org/jira/browse/FLINK-23633
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Reporter: Thomas Weise
>Priority: Major
>
> As of FLINK-22670 FileSource can be used with HybridSource with fixed end 
> position. To support the scenario where the switch position isn't known ahead 
> of time, FileSource needs to have a hook to decide when it is time to stop 
> with continuous polling and then expose the end position through the 
> enumerator.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23633) HybridSource: Support dynamic stop position in FileSource

2022-05-09 Thread Xinbin Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534048#comment-17534048
 ] 

Xinbin Huang commented on FLINK-23633:
--

Hi [~thw] , may I try to implement this?

> HybridSource: Support dynamic stop position in FileSource
> -
>
> Key: FLINK-23633
> URL: https://issues.apache.org/jira/browse/FLINK-23633
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Reporter: Thomas Weise
>Priority: Major
>
> As of FLINK-22670 FileSource can be used with HybridSource with fixed end 
> position. To support the scenario where the switch position isn't known ahead 
> of time, FileSource needs to have a hook to decide when it is time to stop 
> with continuous polling and then expose the end position through the 
> enumerator.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-05-09 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534026#comment-17534026
 ] 

Roman Khachatryan commented on FLINK-23143:
---

Thanks a lot for sharing the prototype.
I like the simplicity of your solution, but I think the concern I expressed 
above (about creation of multiple state objects) is valid.
Besides that, RegisteredKeyValueStateBackendMetaInfo seems implementation 
detail of a state backend, so it shouldn't be exposed.

Regarding the key serializer upgrade, I think it's [not 
supported|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/]
 currently and therefore is out of the scope of this ticket.

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] dotbg opened a new pull request, #95: Migrated to flink 1.15.0.

2022-05-09 Thread GitBox


dotbg opened a new pull request, #95:
URL: https://github.com/apache/flink-ml/pull/95

   * Flink 1.15.0
   * Scala binary version used in test artifacts only.  Hence artifact names 
were changed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #19538: [FLINK-27323][flink-table-api-java][tests] Migrate tests to JUnit5

2022-05-09 Thread GitBox


snuyanzin commented on PR #19538:
URL: https://github.com/apache/flink/pull/19538#issuecomment-1121530098

   merge conflicts resolved and rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19684: Add support to pass Datadog API Key as environment variable

2022-05-09 Thread GitBox


flinkbot commented on PR #19684:
URL: https://github.com/apache/flink/pull/19684#issuecomment-1121463902

   
   ## CI report:
   
   * 90ea579c38f9a5f3de805c8dd1b74ba7fc2d6a26 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pinxiu opened a new pull request, #19684: Add support to pass Datadog API Key as environment variable

2022-05-09 Thread GitBox


pinxiu opened a new pull request, #19684:
URL: https://github.com/apache/flink/pull/19684

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   Add support to pass Datadog API Key as environment variable to avoid 
explicitly adding API key in plain text config file.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #17564: [FLINK-24640][table] CEIL, FLOOR built-in functions for Timestamp should res…

2022-05-09 Thread GitBox


snuyanzin commented on PR #17564:
URL: https://github.com/apache/flink/pull/17564#issuecomment-1121413777

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27492) Flink table scala example does not including the scala-api jars

2022-05-09 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533908#comment-17533908
 ] 

Yun Gao commented on FLINK-27492:
-

For another thing, perhaps we could also including the two Table Scala API 
artifacts in the binary distribution package, like in the opt/ directory? 
Currently the document asks users to download the missing artifacts and put 
them into lib, but currently it seems we do not present the download links for 
these artifacts. 

> Flink table scala example does not including the scala-api jars
> ---
>
> Key: FLINK-27492
> URL: https://issues.apache.org/jira/browse/FLINK-27492
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Assignee: Timo Walther
>Priority: Critical
> Fix For: 1.16.0, 1.15.1
>
>
> Currently it seems the flink-scala-api, flink-table-api-scala-bridge is not 
> including from the binary package[1]. However, currently the scala table 
> examples seems not include the scala-api classes in the generated jar, If we 
> start a standalone cluster from the binary distribution package and then 
> submit a table / sql job in scala, it would fail due to not found the 
> StreamTableEnvironment class.
>  
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#anatomy-of-table-dependencies



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut` teamed up with 
`numRecordsSend` and `numRecordsSendErrors`. 3. the information of using 
`numRecordsSend` has been described in the first part and there is no use case 
for "changing the connector"(all connectors that used `numRecordsOut` in a 
wrong way have been changed). From now on, it is actually mandatory for 
connector developers to use them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for details. Additionally 
since `numRecordsOut` now only counts the records sent between sink tasks and 
`numRecordsOutErrors` was designed for counting the records sent to the 
external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Even with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27506) update playgrounds for Flink 1.14

2022-05-09 Thread Shubham Bansal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533873#comment-17533873
 ] 

Shubham Bansal commented on FLINK-27506:


Thanks, David. I will take a look at the previous commit and return back 
shortly with the list of changes.

> update playgrounds for Flink 1.14
> -
>
> Key: FLINK-27506
> URL: https://issues.apache.org/jira/browse/FLINK-27506
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation / Training / Exercises
>Affects Versions: 1.14.4
>Reporter: David Anderson
>Priority: Major
>  Labels: starter
>
> All of the flink-playgrounds need to be updated for 1.14.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut` teamed up with 
`numRecordsSend` and `numRecordsSendErrors`. 3. the information of using 
`numRecordsSend` has been described in the first part and there is no use case 
for "changing the connector"(all connectors that used `numRecordsOut` in a 
wrong way have been changed). From now on, it is actually mandatory for 
connector developers to use them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for details. Additionally 
since `numRecordsOut` now only counts the records sent between sink tasks and 
`numRecordsOutErrors` was designed for counting the records sent to the 
external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Event with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] HuangZhenQiu commented on pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager

2022-05-09 Thread GitBox


HuangZhenQiu commented on PR #14678:
URL: https://github.com/apache/flink/pull/14678#issuecomment-1121273392

   @zentol 
   Would you please help to review this RP?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut`. 3. the information 
of using `numRecordsSend` has been described in the first part and there is no 
use case for "changing the connector"(all connectors that used `numRecordsOut` 
in a wrong way have been changed). From now on, it is actually mandatory for 
connector developers to use them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for details. Additionally 
since `numRecordsOut` now only counts the records sent between sink tasks and 
`numRecordsOutErrors` was designed for counting the records sent to the 
external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Event with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut`. 3. the information 
of using `numRecordsSend` has been described in the first part and there is no 
use case for "changing the connector"(all connectors that used `numRecordsOut` 
in a wrong way have been changed). From now on, it is actually mandatory for 
connector developers to use them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
sink connectors. Please refer to the new Kafka Sink for more information. 
Additionally since `numRecordsOut` now only counts the records sent between 
sink tasks and `numRecordsOutErrors` was designed for counting the records sent 
to the external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Event with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut`. 3. the information 
of using `numRecordsSend` has been described in the first part and there is no 
use case for "changing the connector"(all connectors that used `numRecordsOut` 
in a wrong way have been changed). From now on, it is actually mandatory for 
connector developers to use them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage of these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
your sink connectors. Please refer to the new Kafka Sink for more information. 
Additionally since `numRecordsOut` now only counts the records sent between 
sink tasks and `numRecordsOutErrors` was designed for counting the records sent 
to the external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Event with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. Connectors still use `numRecordsOut`. 3. the information 
of using `numRecordsSend` has been described in the first part and there is no 
use case for "changing the connector"(all connectors that used `numRecordsOut` 
in a wrong way have been changed). From now on, it is actually mandatory for 
connector developers to use them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
your sink connectors. Please refer to the new Kafka Sink for more information. 
Additionally since `numRecordsOut` now only counts the records sent between 
sink tasks and `numRecordsOutErrors` was designed for counting the records sent 
to the external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Event with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors

2022-05-09 Thread GitBox


JingGe commented on code in PR #19664:
URL: https://github.com/apache/flink/pull/19664#discussion_r868138403


##
docs/content/release-notes/flink-1.15.md:
##
@@ -406,6 +406,18 @@ updating the client dependency to a version >= 7.14.0 is 
required due to interna
 The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been 
removed.
 If not done already, users need to upgrade to the newer stack (indicated by 
`connector=jdbc` in DDL).
 
+
+ Extensible unified Sink uses new metric to capture outgoing records
+
+# [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126)
+
+New metrics `numRecordsSend` and `numRecordsSendErrors` has been introduced 
for users to monitor the number of 
+records sent to the external system. The `numRecordsOut` should be used to 
monitor the number of records 
+transferred between sink tasks.
+
+Connector developers should Consider using the new metric 
`numRecordsSendErrors` to monitor errors, 
+since `numRecordsOut` has a different and more general meaning, and 
`numRecordsOutErrors` is therefore deprecated.

Review Comment:
   This is confused for users and connector developers, tbh.  1. 
`numRecordsOut` will ONLY count the records(technically Commitables) sent 
between sink tasks. 2. the information about the deprecated metric 
`numRecordsOutErrors` is missing. 3. the information of using `numRecordsSend` 
has been described in the first part and there is no use case for "changing the 
connector"(all connectors that used `numRecordsOut` in a wrong way have been 
changed). From now on, it is actually mandatory for connector developers to use 
them except `numRecordsOutErrors`. 
   
   How about this version:
   
   Connector developers should pay attention to the usage these metrics 
`numRecordsOut`, `numRecordsSend` and `numRecordsSendErrors` while building 
your sink connectors. Please refer to the new Kafka Sink for more information. 
Additionally since `numRecordsOut` now only counts the records sent between 
sink tasks and `numRecordsOutErrors` was designed for counting the records sent 
to the external system, we deprecated `numRecordsOutErrors` and recommend using 
`numRecordsSendErrors` instead.
   
   Event with this description, it is still unclear, since `numRecordsOut` is 
still used, why is `numRecordsOutErrors` deprecated? It should be used to 
monitor errors happened while Commitables were send between sink tasks. WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27552) Prometheus metrics disappear after starting a job

2022-05-09 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-27552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533852#comment-17533852
 ] 

João Boto commented on FLINK-27552:
---

I was testing with a job that was a KafkaSink (sorry)

disabling kafkasink metrics works..

 

this could be closed

> Prometheus metrics disappear after starting a job
> -
>
> Key: FLINK-27552
> URL: https://issues.apache.org/jira/browse/FLINK-27552
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.15.0
>Reporter: João Boto
>Priority: Major
>
> I have a Standalone cluster (with jobmanager and taskmanager on same machine) 
> on 1.14.4 and I'm testing the migration to 1.15.0
> But I keep losing the taskmanager metrics when I start a job on the 1.15 
> cluster
> I use the same configuration as in the previous cluster
> {{  }}
> {code:java}
> metrics.reporters: prom 
> metrics.reporter.prom.factory.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporterFactory 
> metrics.reporter.prom.port: 9250-9251{code}
> {{ }}
> If the cluster is running without jobs I can see the metrics on port 9250 for 
> jobmanager and on port 9251 for taskmanager
> If I start a job, the metrics from taskmanager disappear and if I stop the 
> job the metrics come live again
> What am I missing?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-09 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r868113102


##
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java:
##
@@ -723,11 +721,11 @@ public void testCassandraBatchPojoFormat() throws 
Exception {
 
 final List pojos = 
writePojosWithOutputFormat(annotatedPojoClass);
 ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
-Assert.assertEquals(20, rs.all().size());
+assertThat(rs.all()).hasSize(20);
 
 final List result = 
readPojosWithInputFormat(annotatedPojoClass);
-Assert.assertEquals(20, result.size());
-assertThat(result, samePropertyValuesAs(pojos));
+assertThat(result).hasSize(20);
+assertThat(result).satisfies(matching(samePropertyValuesAs(pojos)));

Review Comment:
   Can we find a native, non-hamcrest, assertj equivalent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-26112) Port getRestEndpoint method to the specific service type subclass

2022-05-09 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang closed FLINK-26112.
-
Resolution: Fixed

Fixed via:
master:
f24dc6da25246fdaf278f4b2ce88892ec79d1b8b
ab3eb40d920fa609f49164a0bbb5fcbb3004a808
ff7d61954cad97e3e52df0c006f130de39a6a5c2


> Port getRestEndpoint method to the specific service type subclass
> -
>
> Key: FLINK-26112
> URL: https://issues.apache.org/jira/browse/FLINK-26112
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
> Fix For: 1.16.0
>
>
> In the [FLINK-20830|https://issues.apache.org/jira/browse/FLINK-20830], we 
> introduce serval subclass to deal with the service build and query, This 
> ticket is meant to move the related code to the proper class 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27558) Introduce a new optional option for TableStoreFactory to represent planned manifest entries

2022-05-09 Thread Jane Chan (Jira)
Jane Chan created FLINK-27558:
-

 Summary: Introduce a new optional option for TableStoreFactory to 
represent planned manifest entries
 Key: FLINK-27558
 URL: https://issues.apache.org/jira/browse/FLINK-27558
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Jane Chan
 Fix For: table-store-0.2.0


When 
{code:java}
TableStoreFactory.onCompactTable
{code}
gets called, the planned manifest entries need to be injected back into the 
enriched options, and we need a new key to represent it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-26112) Port getRestEndpoint method to the specific service type subclass

2022-05-09 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-26112:
--
Fix Version/s: 1.16.0

> Port getRestEndpoint method to the specific service type subclass
> -
>
> Key: FLINK-26112
> URL: https://issues.apache.org/jira/browse/FLINK-26112
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
> Fix For: 1.16.0
>
>
> In the [FLINK-20830|https://issues.apache.org/jira/browse/FLINK-20830], we 
> introduce serval subclass to deal with the service build and query, This 
> ticket is meant to move the related code to the proper class 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] wangyang0918 merged pull request #18762: [FlINK-26112][k8s]Port getRestEndpoint method to the specific service type subclass

2022-05-09 Thread GitBox


wangyang0918 merged PR #18762:
URL: https://github.com/apache/flink/pull/18762


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-09 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r868090797


##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java:
##
@@ -169,45 +168,47 @@ private void assertSourceMetrics(
 boolean hasTimestamps) {
 List groups =
 reporter.findOperatorMetricGroups(jobId, 
"MetricTestingSource");
-assertThat(groups, hasSize(parallelism));
+assertThat(groups).hasSize(parallelism);
 
 int subtaskWithMetrics = 0;
 for (OperatorMetricGroup group : groups) {
 Map metrics = reporter.getMetricsByGroup(group);
 // there are only 2 splits assigned; so two groups will not update 
metrics
 if (group.getIOMetricGroup().getNumRecordsInCounter().getCount() 
== 0) {
 // assert that optional metrics are not initialized when no 
split assigned
-assertThat(
-metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
-
isGauge(equalTo(InternalSourceReaderMetricGroup.UNDEFINED)));
-assertThat(metrics.get(MetricNames.WATERMARK_LAG), 
nullValue());
+
assertThat(metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG))
+.satisfies(
+matching(
+isGauge(
+equalTo(
+
InternalSourceReaderMetricGroup
+.UNDEFINED;
+assertThat(metrics.get(MetricNames.WATERMARK_LAG)).isNull();
 continue;
 }
 subtaskWithMetrics++;
 // I/O metrics
-assertThat(
-group.getIOMetricGroup().getNumRecordsInCounter(),
-isCounter(equalTo(processedRecordsPerSubtask)));
-assertThat(
-group.getIOMetricGroup().getNumBytesInCounter(),
-isCounter(
-equalTo(
-processedRecordsPerSubtask
-* 
MockRecordEmitter.RECORD_SIZE_IN_BYTES)));
+assertThat(group.getIOMetricGroup().getNumRecordsInCounter())
+
.satisfies(matching(isCounter(equalTo(processedRecordsPerSubtask;
+assertThat(group.getIOMetricGroup().getNumBytesInCounter())
+.satisfies(
+matching(
+isCounter(
+equalTo(
+processedRecordsPerSubtask
+* MockRecordEmitter
+
.RECORD_SIZE_IN_BYTES;
 // MockRecordEmitter is just incrementing errors every even record
-assertThat(
-metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS),
-isCounter(equalTo(processedRecordsPerSubtask / 2)));
+assertThat(metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS))
+
.satisfies(matching(isCounter(equalTo(processedRecordsPerSubtask / 2;

Review Comment:
   It seems that those matchers are used extensively and the readability really 
suffers. The `isGauge`, `isCounter` etc.  helper methods have pretty simple 
logic. I think we should move away from hamcrest as much as possible. This 
`.satisfies(matching(...)` really diminishes the purpose of introducing 
matchers for the fluent API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-09 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r868080704


##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java:
##
@@ -29,9 +29,8 @@
 
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;

Review Comment:
   Usage could be substituted by assertThatThrownBy.



##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java:
##
@@ -158,7 +154,7 @@ public void 
testHandleSplitRequestAfterSwitchAndReaderReset() {
 enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), 
SUBTASK0);
 try {
 enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
-Assert.fail("expected exception");
+fail("expected exception");

Review Comment:
   How about assertThatThrownBy?



##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java:
##
@@ -169,45 +168,47 @@ private void assertSourceMetrics(
 boolean hasTimestamps) {
 List groups =
 reporter.findOperatorMetricGroups(jobId, 
"MetricTestingSource");
-assertThat(groups, hasSize(parallelism));
+assertThat(groups).hasSize(parallelism);
 
 int subtaskWithMetrics = 0;
 for (OperatorMetricGroup group : groups) {
 Map metrics = reporter.getMetricsByGroup(group);
 // there are only 2 splits assigned; so two groups will not update 
metrics
 if (group.getIOMetricGroup().getNumRecordsInCounter().getCount() 
== 0) {
 // assert that optional metrics are not initialized when no 
split assigned
-assertThat(
-metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
-
isGauge(equalTo(InternalSourceReaderMetricGroup.UNDEFINED)));
-assertThat(metrics.get(MetricNames.WATERMARK_LAG), 
nullValue());
+
assertThat(metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG))
+.satisfies(
+matching(
+isGauge(
+equalTo(
+
InternalSourceReaderMetricGroup
+.UNDEFINED;
+assertThat(metrics.get(MetricNames.WATERMARK_LAG)).isNull();
 continue;
 }
 subtaskWithMetrics++;
 // I/O metrics
-assertThat(
-group.getIOMetricGroup().getNumRecordsInCounter(),
-isCounter(equalTo(processedRecordsPerSubtask)));
-assertThat(
-group.getIOMetricGroup().getNumBytesInCounter(),
-isCounter(
-equalTo(
-processedRecordsPerSubtask
-* 
MockRecordEmitter.RECORD_SIZE_IN_BYTES)));
+assertThat(group.getIOMetricGroup().getNumRecordsInCounter())
+
.satisfies(matching(isCounter(equalTo(processedRecordsPerSubtask;
+assertThat(group.getIOMetricGroup().getNumBytesInCounter())
+.satisfies(
+matching(
+isCounter(
+equalTo(
+processedRecordsPerSubtask
+* MockRecordEmitter
+
.RECORD_SIZE_IN_BYTES;
 // MockRecordEmitter is just incrementing errors every even record
-assertThat(
-metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS),
-isCounter(equalTo(processedRecordsPerSubtask / 2)));
+assertThat(metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS))
+
.satisfies(matching(isCounter(equalTo(processedRecordsPerSubtask / 2;

Review Comment:
   It seems that those matchers are used extensively and the readability really 
suffers. Those `isGauge`, `isCounter` methods helper methods have pretty simple 
logic. I think we should move away from hamcrest as much as possible. This 
`.satisfies(matching(...)` really diminishes the purpose of introducing 
matchers for the fluent API.



##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/f

[GitHub] [flink-web] liuzhuang2017 commented on pull request #528: [hotfix][docs] Delete the redundant content of "downloads.zh.md" file.

2022-05-09 Thread GitBox


liuzhuang2017 commented on PR #528:
URL: https://github.com/apache/flink-web/pull/528#issuecomment-1121172421

   @MartijnVisser , Thank you very much for your review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >