[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765507159



##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/typeinfo/DenseMatrixSerializer.java
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.linalg.typeinfo;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.ml.linalg.DenseMatrix;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Specialized serializer for {@link DenseMatrix}. */
+public final class DenseMatrixSerializer extends 
TypeSerializerSingleton {
+
+private static final long serialVersionUID = 1L;
+
+private static final double[] EMPTY = new double[0];
+
+private static final DenseMatrixSerializer INSTANCE = new 
DenseMatrixSerializer();
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public DenseMatrix createInstance() {
+return new DenseMatrix(0, 0, EMPTY);
+}
+
+@Override
+public DenseMatrix copy(DenseMatrix from) {
+return new DenseMatrix(
+from.numRows, from.numCols, Arrays.copyOf(from.values, 
from.values.length));
+}
+
+@Override
+public DenseMatrix copy(DenseMatrix from, DenseMatrix reuse) {
+if (from.values.length == reuse.values.length) {

Review comment:
   I think if size not equal, we just need create a new DenseMatrix and 
return.
   If we create a new DenseMatrix, we have to change the reuse. If when next 
copy time comes, reuse size is right, then we do the wrong thing. 





-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] lindong28 commented on a change in pull request #37: [FLINK-24955] Add Estimator and Transformer for One Hot Encoder

2021-12-08 Thread GitBox


lindong28 commented on a change in pull request #37:
URL: https://github.com/apache/flink-ml/pull/37#discussion_r765496493



##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java
##
@@ -95,4 +95,9 @@ public boolean validate(T value) {
 }
 };
 }
+
+// Check if the parameter value array is not empty array.
+public static  ParamValidator nonEmptyArray() {

Review comment:
   Could you test this validator in `StageTest::testValidators(...)`?

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasHandleInvalid.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared handleInvalid param. */
+public interface HasHandleInvalid extends WithParams {
+Param HANDLE_INVALID =
+new StringParam(
+"handleInvalid",
+"Strategy to handle invalid entries.",
+"ERROR",

Review comment:
   Could we use lower-case letters (i.e. `error`) as the value here for 
consistency with `HasDistanceMeasure`?

##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+public final int n;
+public final int[] indices;
+public final double[] values;
+
+public SparseVector(int n, int[] indices, double[] values) {
+this.n = n;
+this.indices = indices;
+this.values = values;
+checkSizeAndIndicesRange();

Review comment:
   Instead of making three passes over the vector (i.e. 
`checkSizeAndIndicesRange`, `isIndicesSorted` and `checkDuplicatedIndices`). 
Could we make just one pass to improve efficiency?

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoder.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 

[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-4266) Cassandra SplitOver Statebackend

2021-12-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456215#comment-17456215
 ] 

Yun Tang commented on FLINK-4266:
-

[~foxss] What's the progress of thie ticket, can we close it now?

> Cassandra SplitOver Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Assignee: Chen Qin
>Priority: Not a Priority
>  Labels: stale-assigned
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765502926



##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/typeinfo/DenseMatrixSerializer.java
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.linalg.typeinfo;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.ml.linalg.DenseMatrix;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Specialized serializer for {@link DenseMatrix}. */
+public final class DenseMatrixSerializer extends 
TypeSerializerSingleton {
+
+private static final long serialVersionUID = 1L;
+
+private static final double[] EMPTY = new double[0];
+
+private static final DenseMatrixSerializer INSTANCE = new 
DenseMatrixSerializer();
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public DenseMatrix createInstance() {
+return new DenseMatrix(0, 0, EMPTY);
+}
+
+@Override
+public DenseMatrix copy(DenseMatrix from) {
+return new DenseMatrix(
+from.numRows, from.numCols, Arrays.copyOf(from.values, 
from.values.length));
+}
+
+@Override
+public DenseMatrix copy(DenseMatrix from, DenseMatrix reuse) {
+if (from.values.length == reuse.values.length) {
+System.arraycopy(from.values, 0, reuse.values, 0, 
from.values.length);
+return reuse;
+}
+return copy(from);
+}
+
+@Override
+public int getLength() {
+return -1;
+}
+
+@Override
+public void serialize(DenseMatrix matrix, DataOutputView target) throws 
IOException {
+if (matrix == null) {
+throw new IllegalArgumentException("The matrix must not be null.");
+}
+final int len = matrix.values.length;
+target.writeInt(matrix.numRows);
+target.writeInt(matrix.numCols);
+for (int i = 0; i < len; i++) {
+target.writeDouble(matrix.values[i]);
+}
+}
+
+@Override
+public DenseMatrix deserialize(DataInputView source) throws IOException {
+int m = source.readInt();
+int n = source.readInt();
+double[] values = new double[m * n];
+deserializeDoubleArray(values, source, m * n);
+return new DenseMatrix(m, n, values);
+}
+
+private static void deserializeDoubleArray(double[] dst, DataInputView 
source, int len)
+throws IOException {
+for (int i = 0; i < len; i++) {
+dst[i] = source.readDouble();
+}
+}
+
+@Override
+public DenseMatrix deserialize(DenseMatrix reuse, DataInputView source) 
throws IOException {
+int m = source.readInt();
+int n = source.readInt();
+double[] values = reuse.values;

Review comment:
   I think if size not equal, we need allocate a new double[] for 
deserialization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24489) The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the number of the elements in the cache

2021-12-08 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456213#comment-17456213
 ] 

Arvid Heise commented on FLINK-24489:
-

Merged into master as 4d142b3a417c57e4ed8af4c886fae68d927da067. I'd say that 
this is an improvement (and not just a simple bug fix), so we shouldn't 
backport. WDYT?

> The size of entryCache & eventsBufferCache in the SharedBuffer should be 
> defined with a threshold to limit the number of the elements in the cache
> --
>
> Key: FLINK-24489
> URL: https://issues.apache.org/jira/browse/FLINK-24489
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / CEP
>Affects Versions: 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0
>Reporter: Yuepeng Pan
>Priority: Major
>  Labels: pull-request-available
> Attachments: details of the big map object.png, incoming-reference-to 
> the-big-map-obj.png, 截屏2021-10-13 20.28.17.png
>
>
> source code : 
> [here|https://github.com/apache/flink/blob/c3cb886ee73b5fee23b2bccff0f5e4d45a30b3a1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java#L79]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] AHeise merged pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the number of the elem

2021-12-08 Thread GitBox


AHeise merged pull request #17692:
URL: https://github.com/apache/flink/pull/17692


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765500552



##
File path: flink-ml-core/src/main/java/org/apache/flink/ml/linalg/BLAS.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.util.Preconditions;
+
+/** A utility class that provides BLAS routines over matrices and vectors. */
+public class BLAS {
+
+private static final dev.ludovic.netlib.BLAS NATIVE_BLAS =
+dev.ludovic.netlib.BLAS.getInstance();
+
+/**
+ * \sum_i |x_i| .
+ *
+ * @param x x
+ * @return \sum_i |x_i|
+ */
+public static double asum(DenseVector x) {
+return NATIVE_BLAS.dasum(x.size(), x.values, 0, 1);
+}
+
+/**
+ * y += a * x .
+ *
+ * @param a a
+ * @param x x
+ * @param y y
+ */
+public static void axpy(double a, DenseVector x, DenseVector y) {
+Preconditions.checkArgument(x.size() == y.size(), "Array dimension 
mismatched.");
+NATIVE_BLAS.daxpy(x.size(), a, x.values, 1, y.values, 1);
+}
+
+/**
+ * x \cdot y .
+ *
+ * @param x x
+ * @param y y
+ * @return x \cdot y
+ */
+public static double dot(DenseVector x, DenseVector y) {
+Preconditions.checkArgument(x.size() == y.size(), "Array dimension 
mismatched.");
+return NATIVE_BLAS.ddot(x.size(), x.values, 1, y.values, 1);
+}
+
+/**
+ * \sqrt(\sum_i x_i * x_i) .
+ *
+ * @param x x
+ * @return \sqrt(\sum_i x_i * x_i)
+ */
+public static double norm2(DenseVector x) {
+return NATIVE_BLAS.dnrm2(x.size(), x.values, 1);
+}
+
+/**
+ * x = x * a .
+ *
+ * @param a a
+ * @param x x
+ */
+public static void scal(double a, DenseVector x) {
+NATIVE_BLAS.dscal(x.size(), a, x.values, 1);
+}
+
+/** y := alpha * A * x + beta * y . */

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-3947) Provide low level access to RocksDB state backend

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-3947.
---
Resolution: Information Provided

MapState and keyedStatebackend#getKeys have been implemented. Closing this 
ticket due to lack of activity.

> Provide low level access to RocksDB state backend
> -
>
> Key: FLINK-3947
> URL: https://issues.apache.org/jira/browse/FLINK-3947
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The current state API is limiting and some implementations are not as 
> efficient as they could be, particularly when working with large states. For 
> instance, a ListState is append only.  You cannot remove values from the 
> list.  And the RocksDBListState get() implementation reads all list values 
> from RocksDB instead of returning an Iterable that only reads values as 
> needed.
> Furthermore, RocksDB is an ordered KV store, yet there is no ordered map 
> state API with an ability to iterate over the stored values in order.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #18009: [FLINK-25126][kafka] Reset internal transaction state of FlinkKafkaIn…

2021-12-08 Thread GitBox


AHeise commented on a change in pull request #18009:
URL: https://github.com/apache/flink/pull/18009#discussion_r765499014



##
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
##
@@ -59,49 +63,83 @@
 
 private static final String TRANSACTION_PREFIX = "test-transaction-";
 
-Properties getProperties() {
-Properties properties = new Properties();
-properties.put(
-CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
-KAFKA_CONTAINER.getBootstrapServers());
-properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
-properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-return properties;
-}
-
 @Test
 void testInitTransactionId() {
+final String topic = "test-init-transactions";
 try (FlinkKafkaInternalProducer reuse =
 new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
 int numTransactions = 20;
 for (int i = 1; i <= numTransactions; i++) {
 reuse.initTransactionId(TRANSACTION_PREFIX + i);
 reuse.beginTransaction();
-reuse.send(new ProducerRecord<>(TEST_TOPIC, "test-value-" + 
i));
+reuse.send(new ProducerRecord<>(topic, "test-value-" + i));
 if (i % 2 == 0) {
 reuse.commitTransaction();
 } else {
 reuse.flush();
 reuse.abortTransaction();
 }
 assertNumTransactions(i);
-assertThat(readRecords(TEST_TOPIC).count(), equalTo(i / 2));
+assertThat(readRecords(topic).count()).isEqualTo(i / 2);
+}
+}
+}
+
+@ParameterizedTest
+@MethodSource("provideTransactionsFinalizer")
+void testResetInnerTransactionIfFinalizingTransactionFailed(
+Consumer> transactionFinalizer) {
+final String topic = "reset-producer-internal-state";
+try (FlinkKafkaInternalProducer fenced =
+new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
+fenced.initTransactions();
+fenced.beginTransaction();
+fenced.send(new ProducerRecord<>(topic, "test-value"));
+// Start a second producer that fences the first one
+try (FlinkKafkaInternalProducer producer =
+new FlinkKafkaInternalProducer<>(getProperties(), 
"dummy")) {
+producer.initTransactions();
+producer.beginTransaction();
+producer.send(new ProducerRecord<>(topic, "test-value"));
+producer.commitTransaction();
 }
+assertThatThrownBy(() -> transactionFinalizer.accept(fenced))
+.isInstanceOf(ProducerFencedException.class)
+.hasMessageContaining(
+"There is a newer producer with the same 
transactionalId which fences the current one.");

Review comment:
   `Maybe remove this check`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-4413) Improve savepoint restore error messages

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-4413.
---
Resolution: Information Provided

Closing this ticket due to lack of activity.

> Improve savepoint restore error messages
> 
>
> Key: FLINK-4413
> URL: https://issues.apache.org/jira/browse/FLINK-4413
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / State Backends
>Affects Versions: 1.1.0
>Reporter: Gyula Fora
>Priority: Not a Priority
>
> Currently when savepoint restore fails due to some problems with parallelism 
> or the assigned uids the error messages contain only the job vertex id of the 
> problematic task.
> This makes these kind of problems very difficult to debug for more complex 
> topologies.
> I propose to add the user assigned task names to these error messages to make 
> this much easier for users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-4493) Unify the snapshot output format for keyed-state backends

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-4493.
---
Resolution: Information Provided

Already implemented in FLINK-20976.

> Unify the snapshot output format for keyed-state backends
> -
>
> Key: FLINK-4493
> URL: https://issues.apache.org/jira/browse/FLINK-4493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Priority: Not a Priority
>
> We could unify the output format for keyed-state backends implementations, 
> e.g. based on RocksDB and Heap, to write a single, common output format.
> For example, this would allow us to restore a state that was previously kept 
> in RocksDB on a heap-located backend and vice versa.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] AHeise merged pull request #18047: [FLINK-25091][BP-1.12] Change ORC compression attribute reference error in FileSink docs

2021-12-08 Thread GitBox


AHeise merged pull request #18047:
URL: https://github.com/apache/flink/pull/18047


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise merged pull request #18046: [FLINK-25091][BP-1.13] Change ORC compression attribute reference error in FileSink docs

2021-12-08 Thread GitBox


AHeise merged pull request #18046:
URL: https://github.com/apache/flink/pull/18046


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise merged pull request #18045: [FLINK-25091][BP-1.14] Change ORC compression attribute reference error in FileSink docs

2021-12-08 Thread GitBox


AHeise merged pull request #18045:
URL: https://github.com/apache/flink/pull/18045


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xuyangzhong commented on a change in pull request #18017: [FLINK-25171] Validation of duplicate fields in derived tables

2021-12-08 Thread GitBox


xuyangzhong commented on a change in pull request #18017:
URL: https://github.com/apache/flink/pull/18017#discussion_r765486095



##
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
##
@@ -40,10 +40,9 @@ class UnionTest extends TableTestBase {
  |CREATE TABLE t1 (
  |  id int,
  |  ts bigint,
- |  name string,
+ |  name varchar(32),

Review comment:
   If you want to delete one of them, I think it's better to keep the one 
that has the same type with other tables.

##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
##
@@ -494,7 +494,13 @@ private void collectPhysicalFieldsTypes(List 
derivedColumns) {
 boolean nullable = type.getNullable() == null ? true : 
type.getNullable();
 RelDataType relType = type.deriveType(sqlValidator, 
nullable);
 // add field name and field type to physical field list
-physicalFieldNamesToTypes.put(name, relType);
+RelDataType oldType = physicalFieldNamesToTypes.put(name, 
relType);
+if (oldType != null) {
+throw new ValidationException(
+String.format(
+"A column named '%s' already exists in 
the derived table.",

Review comment:
   The exception message is confusing where the derived table is, because 
users maybe only use "create table ..." instead of "create table ... like ...". 
But the previous one can also cause this bug. IMO, you can delete the word 
'derived'.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-4916) BufferSpiller should distribute files across temp directories

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-4916.
---
Resolution: Information Provided

{{BufferSpiller}} had been dropped.

> BufferSpiller should distribute files across temp directories
> -
>
> Key: FLINK-4916
> URL: https://issues.apache.org/jira/browse/FLINK-4916
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, the {{BufferSpiller}} puts files into one temp directory.
> It should be a simple extension to allow it to rotate files across temp 
> directories.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-5151) Add discussion about object mutations to heap-based state backend docs.

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-5151:
---

Assignee: Hangxiang Yu

> Add discussion about object mutations to heap-based state backend docs.
> ---
>
> Key: FLINK-5151
> URL: https://issues.apache.org/jira/browse/FLINK-5151
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Affects Versions: 1.1.4, 1.2.0
>Reporter: Fabian Hueske
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> Flink's heap state backends store data as objects on the heap. Any object 
> mutations are hence reflected in the state.
> This can lead to unexpected behavior. For example, in case of sliding 
> windows, multiple window hold references to the same object. Hence, all 
> windows are affected if such an object is modified, e.g., by a 
> {{WindowFunction}}, {{ReduceFunction}}, or {{FoldFunction}}, and might return 
> invalid results.
> We should add this information to the state backend documentation and also 
> point out that the RocksDB backend is not affected by this because all data 
> is serialized.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765495150



##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/DenseMatrix.java
##
@@ -0,0 +1,71 @@
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.DenseMatrixTypeInfoFactory;
+
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+
+/**
+ * Column-major dense matrix. The entry values are stored in a single array of 
doubles with columns
+ * listed in sequence.
+ */
+@TypeInfo(DenseMatrixTypeInfoFactory.class)
+public class DenseMatrix implements Matrix {
+
+/** Row dimension. */
+public final int numRows;
+
+/** Column dimension. */
+public final int numCols;
+
+/**
+ * Array for internal storage of elements.
+ *
+ * The matrix data is stored in column major format internally.
+ */
+public final double[] values;
+
+/**
+ * Constructs an m-by-n matrix of zeros.
+ *
+ * @param numRows Number of rows.
+ * @param numCols Number of columns.
+ */
+public DenseMatrix(int numRows, int numCols) {
+this(numRows, numCols, new double[numRows * numCols]);
+}
+
+/**
+ * Constructs a matrix from a 1-D array. The data in the array should 
organize in column major.
+ *
+ * @param numRows Number of rows.
+ * @param numCols Number of cols.
+ * @param values One-dimensional array of doubles.
+ */
+public DenseMatrix(int numRows, int numCols, double[] values) {
+Preconditions.checkArgument(values.length == numRows * numCols);
+this.numRows = numRows;
+this.numCols = numCols;
+this.values = values;
+}
+
+@Override
+public int numRows() {
+return numRows;
+}
+
+@Override
+public int numCols() {
+return numCols;
+}
+
+@Override
+public double get(int i, int j) {
+return values[numRows * j + i];

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765494989



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java
##
@@ -0,0 +1,273 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests Knn and KnnModel. */
+public class KnnTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainData;
+private static final String LABEL_COL = "test_label";
+private static final String PRED_COL = "test_prediction";
+private static final String VEC_COL = "test_features";
+private static final List trainArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(1, Vectors.dense(2.0, 3.0)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(2, Vectors.dense(200.1, 300.1)),
+Row.of(2, Vectors.dense(200.2, 300.2)),
+Row.of(2, Vectors.dense(200.3, 300.3)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.6, 300.6)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(3, Vectors.dense(2.8, 3.2)),
+Row.of(4, Vectors.dense(300., 3.2)),
+Row.of(1, Vectors.dense(2.2, 3.2)),
+Row.of(5, Vectors.dense(2.4, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(1, Vectors.dense(2.1, 3.1;
+
+private static final List testArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, 
Vectors.dense(300, 42;
+private Table testData;
+
+@Before
+public void before() {
+Configuration config = new Configuration();
+
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+env.setParallelism(4);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+tEnv = StreamTableEnvironment.create(env);
+
+Schema schema =
+Schema.newBuilder()
+.column("f0", DataTypes.INT())

Review comment:
   After I discuss with @zhipeng93, I decide using double to be the label 
type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-12-08 Thread GitBox


zhipeng93 commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r765474898



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModelData.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.linear;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** Model data of {@link LogisticRegressionModel}. */
+public class LogisticRegressionModelData {
+
+public final DenseVector coefficient;
+
+public LogisticRegressionModelData(DenseVector coefficient) {
+this.coefficient = coefficient;
+}
+
+/**
+ * Converts the table model to a data stream.
+ *
+ * @param modelData The table model data.
+ * @return The data stream model data.
+ */
+public static DataStream 
getModelDataStream(Table modelData) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
modelData).getTableEnvironment();
+return tEnv.toDataStream(modelData).map(x -> 
(LogisticRegressionModelData) x.getField(0));
+}
+
+/** Data encoder for {@link LogisticRegressionModel}. */
+public static class ModelDataEncoder implements 
Encoder {
+
+@Override
+public void encode(LogisticRegressionModelData modelData, OutputStream 
outputStream)
+throws IOException {
+DenseVectorSerializer serializer = new DenseVectorSerializer();
+serializer.serialize(
+modelData.coefficient, new 
DataOutputViewStreamWrapper(outputStream));
+}
+}
+
+/** Data decoder for {@link LogisticRegressionModel}. */
+public static class ModelDataDecoder extends 
SimpleStreamFormat {

Review comment:
   `KmeansModelData` is already updated. For naivebayes, how about we do 
the update in NaiveBayes PR given that it is not merged yet?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zuston edited a comment on pull request #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful

2021-12-08 Thread GitBox


zuston edited a comment on pull request #5982:
URL: https://github.com/apache/flink/pull/5982#issuecomment-987845370


   Do you mind i take over? @sihuazhou I met the same problems and want to 
solve it.
   
   And any ideas on it @StephanEwen @wangyang0918 @KarmaGYZ @XComp @zentol 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-5151) Add discussion about object mutations to heap-based state backend docs.

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-5151:

Fix Version/s: 1.15.0

> Add discussion about object mutations to heap-based state backend docs.
> ---
>
> Key: FLINK-5151
> URL: https://issues.apache.org/jira/browse/FLINK-5151
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Affects Versions: 1.1.4, 1.2.0
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> Flink's heap state backends store data as objects on the heap. Any object 
> mutations are hence reflected in the state.
> This can lead to unexpected behavior. For example, in case of sliding 
> windows, multiple window hold references to the same object. Hence, all 
> windows are affected if such an object is modified, e.g., by a 
> {{WindowFunction}}, {{ReduceFunction}}, or {{FoldFunction}}, and might return 
> invalid results.
> We should add this information to the state backend documentation and also 
> point out that the RocksDB backend is not affected by this because all data 
> is serialized.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] zuston edited a comment on pull request #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful

2021-12-08 Thread GitBox


zuston edited a comment on pull request #5982:
URL: https://github.com/apache/flink/pull/5982#issuecomment-987845370


   Do you mind i take over? @sihuazhou I met the same problems and want to 
solve it.
   
   And any ideas on it @StephanEwen @wangyang0918 @KarmaGYZ @XComp 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zuston edited a comment on pull request #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful

2021-12-08 Thread GitBox


zuston edited a comment on pull request #5982:
URL: https://github.com/apache/flink/pull/5982#issuecomment-987845370


   Do you mind i take over? @sihuazhou I met the same problems and want to 
solve it.
   
   And any ideas on it @StephanEwen @wangyang0918 @KarmaGYZ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18060: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18060:
URL: https://github.com/apache/flink/pull/18060#issuecomment-989583760


   
   ## CI report:
   
   * ce46fd7f0b44fec9587ab8a8aef30d89a617a1a5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27851)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Myasuka commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode

2021-12-08 Thread GitBox


Myasuka commented on a change in pull request #18024:
URL: https://github.com/apache/flink/pull/18024#discussion_r765490259



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
##
@@ -46,4 +46,14 @@
 .withDescription(
 "Allow to skip savepoint state that cannot be 
restored. "
 + "Allow this if you removed an operator 
from your pipeline after the savepoint was triggered.");
+/**
+ * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
+ */
+public static final ConfigOption RESTORE_MODE =
+key("execution.savepoint-restore-mode")
+.enumType(RestoreMode.class)
+.defaultValue(RestoreMode.NO_CLAIM)
+.withDescription(
+"Describes the mode how Flink should restore from 
the given"
++ " savepoint or retained checkpoint.");

Review comment:
   Got it. This sounds more reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18060: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-12-08 Thread GitBox


flinkbot commented on pull request #18060:
URL: https://github.com/apache/flink/pull/18060#issuecomment-989583760






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24728) Batch SQL file sink forgets to close the output stream

2021-12-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456191#comment-17456191
 ] 

Jingsong Lee commented on FLINK-24728:
--

[~TsReaper] Can you cherry-pick for 1.14 and 1.13? Two commits in one PR is OK.

> Batch SQL file sink forgets to close the output stream
> --
>
> Key: FLINK-24728
> URL: https://issues.apache.org/jira/browse/FLINK-24728
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.4, 1.14.0, 1.12.5, 1.13.3
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> I tried to write a large avro file into HDFS and discover that the displayed 
> file size in HDFS is extremely small, but copying that file to local yields 
> the correct size. If we create another Flink job and read that avro file from 
> HDFS, the job will finish without outputting any record because the file size 
> Flink gets from HDFS is the very small file size.
> This is because the output format created in 
> {{FileSystemTableSink#createBulkWriterOutputFormat}} only finishes the 
> {{BulkWriter}}. According to the java doc of {{BulkWriter#finish}} bulk 
> writers should not close the output stream and should leave them to the 
> framework.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] dawidwys commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode

2021-12-08 Thread GitBox


dawidwys commented on a change in pull request #18024:
URL: https://github.com/apache/flink/pull/18024#discussion_r765488499



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
##
@@ -46,4 +46,14 @@
 .withDescription(
 "Allow to skip savepoint state that cannot be 
restored. "
 + "Allow this if you removed an operator 
from your pipeline after the savepoint was triggered.");
+/**
+ * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
+ */
+public static final ConfigOption RESTORE_MODE =
+key("execution.savepoint-restore-mode")
+.enumType(RestoreMode.class)
+.defaultValue(RestoreMode.NO_CLAIM)
+.withDescription(
+"Describes the mode how Flink should restore from 
the given"
++ " savepoint or retained checkpoint.");

Review comment:
   side note: Personally, I am not 100% convinced we should have the 
`LEGACY` mode in the final version (once we add proper `NO_CLAIM` mode). We 
might remove it in the end, however I want to add it for now to have the master 
branch releasable at all times.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18042: [WIP][FLINK-25076][table-planner] Improve operator name for sql job

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18042:
URL: https://github.com/apache/flink/pull/18042#issuecomment-987776252


   
   ## CI report:
   
   * 2f62c6d3a186c872eb5ce44ef45ee43c06207a9b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27845)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18017: [FLINK-25171] Validation of duplicate fields in derived tables

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18017:
URL: https://github.com/apache/flink/pull/18017#issuecomment-986500405


   
   ## CI report:
   
   * 196abdbb40a117ea60ad76d7a606d732ed62579c Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27847)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi merged pull request #17655: [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files

2021-12-08 Thread GitBox


JingsongLi merged pull request #17655:
URL: https://github.com/apache/flink/pull/17655


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil opened a new pull request #18060: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-12-08 Thread GitBox


lincoln-lil opened a new pull request #18060:
URL: https://github.com/apache/flink/pull/18060


   This is a cherry pick for FLINK-20370 part1.
   
   ## Brief change log
   update the logic of `FlinkChangelogModeInferenceProgram` and 
`StreamPhysicalSink`
   
   ## Verifying this change
   Streaming sql's `RankTest`, `AggregateTest` and `JoinTest`
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
@Public(Evolving): (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode

2021-12-08 Thread GitBox


dawidwys commented on a change in pull request #18024:
URL: https://github.com/apache/flink/pull/18024#discussion_r765487578



##
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
##
@@ -130,6 +131,46 @@ public void testRun() throws Exception {
 }
 }
 
+@Test
+public void testClaimRestoreModeParsing() throws Exception {
+// test configure savepoint with claim mode
+String[] parameters = {
+"-s", "expectedSavepointPath", "-n", "-r", "claim", 
getTestJarPath()

Review comment:
   Using `-s expectedSavepointPath -n --legacy` is not necessarily 
descriptive. It is not straightforward to tell what the `legacy` refers to.
   
   This has been also pointed out by @rkhachatryan. Do you have any comments on 
that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17919:
URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835


   
   ## CI report:
   
   * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675)
 
   * 605c0ab25a4caf70e1c4154632992b1106a5da47 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode

2021-12-08 Thread GitBox


dawidwys commented on a change in pull request #18024:
URL: https://github.com/apache/flink/pull/18024#discussion_r765486398



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
##
@@ -46,4 +46,14 @@
 .withDescription(
 "Allow to skip savepoint state that cannot be 
restored. "
 + "Allow this if you removed an operator 
from your pipeline after the savepoint was triggered.");
+/**
+ * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
+ */
+public static final ConfigOption RESTORE_MODE =
+key("execution.savepoint-restore-mode")
+.enumType(RestoreMode.class)
+.defaultValue(RestoreMode.NO_CLAIM)
+.withDescription(
+"Describes the mode how Flink should restore from 
the given"
++ " savepoint or retained checkpoint.");

Review comment:
   Yes, you are right `LEGACY` is the current behaviour. The other mode 
`NO_CLAIM` is not part of this PR. It will come in the next PR. 
   
   > With the name of LEGACY, users would still think the new 1st checkpoint is 
an incremental one.
   And they would be right. The `LEGACY` mode is the current behaviour, with no 
changes at all. 

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
##
@@ -46,4 +46,14 @@
 .withDescription(
 "Allow to skip savepoint state that cannot be 
restored. "
 + "Allow this if you removed an operator 
from your pipeline after the savepoint was triggered.");
+/**
+ * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
+ */
+public static final ConfigOption RESTORE_MODE =
+key("execution.savepoint-restore-mode")
+.enumType(RestoreMode.class)
+.defaultValue(RestoreMode.NO_CLAIM)
+.withDescription(
+"Describes the mode how Flink should restore from 
the given"
++ " savepoint or retained checkpoint.");

Review comment:
   Yes, you are right `LEGACY` is the current behaviour. The other mode 
`NO_CLAIM` is not part of this PR. It will come in the next PR. 
   
   > With the name of LEGACY, users would still think the new 1st checkpoint is 
an incremental one.
   
   And they would be right. The `LEGACY` mode is the current behaviour, with no 
changes at all. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25218) Performance issues with lookup join accessing external dimension tables

2021-12-08 Thread Ada Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456188#comment-17456188
 ] 

Ada Wong commented on FLINK-25218:
--

We could import guava cache to cache the same primary key rows.

>  Performance issues with lookup join accessing external dimension tables
> 
>
> Key: FLINK-25218
> URL: https://issues.apache.org/jira/browse/FLINK-25218
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.15.0
>Reporter: ZhuoYu Chen
>Priority: Major
>
> Current lookup join: for each input data, access the external dimension table 
> to get the result and output a data
> Implement a lookup join that can improve performance by batching and delaying



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] gaoyunhaii commented on pull request #20: [FLINK-23959][FLIP-175] Compose Estimator/Model/AlgoOperator from DAG of Estimator/Model/AlgoOperator

2021-12-08 Thread GitBox


gaoyunhaii commented on pull request #20:
URL: https://github.com/apache/flink-ml/pull/20#issuecomment-989576488


   @lindong28 Hello~ could you rebase the PR to the latest master~?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24817) Support Naive Bayes algorithm in Flink ML

2021-12-08 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456183#comment-17456183
 ] 

Yun Gao commented on FLINK-24817:
-

Merge on master via e4f7eedb958e29817525f54ee6944d819d0de47c

> Support Naive Bayes algorithm in Flink ML
> -
>
> Key: FLINK-24817
> URL: https://issues.apache.org/jira/browse/FLINK-24817
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.1.0
>
>
> This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will 
> use latest Flink ML API proposed in FLIP 173~176. 
>  
> Github PR link: https://github.com/apache/flink-ml/pull/21



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24817) Support Naive Bayes algorithm in Flink ML

2021-12-08 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-24817.
---
Resolution: Fixed

> Support Naive Bayes algorithm in Flink ML
> -
>
> Key: FLINK-24817
> URL: https://issues.apache.org/jira/browse/FLINK-24817
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.1.0
>
>
> This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will 
> use latest Flink ML API proposed in FLIP 173~176. 
>  
> Github PR link: https://github.com/apache/flink-ml/pull/21



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   * f21e8c71b1582b77beaf4a0393a51d101b9b80ab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.keyed-shuffle' option to auto keyby on sink's pk if parallelism are not

2021-12-08 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r765481583



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -280,26 +297,29 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+final ExecutionConfigOptions.SinkKeyedShuffle sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && sinkParallelism != 
inputParallelism;
+break;
+case FORCE:
+// single parallelism has no problem
+sinkKeyBy = sinkParallelism != 1 || inputParallelism != 1;

Review comment:
   I was torn about this condition because I originally thought we 
shouldn't specialize the single parallelism case. But considering that there 
are a lot of single parallelism jobs in stream computing, it is worth 
optimizing to remove keyBy for single parallelism.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-24817) Support Naive Bayes algorithm in Flink ML

2021-12-08 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-24817:

Fix Version/s: 0.1.0

> Support Naive Bayes algorithm in Flink ML
> -
>
> Key: FLINK-24817
> URL: https://issues.apache.org/jira/browse/FLINK-24817
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.1.0
>
>
> This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will 
> use latest Flink ML API proposed in FLIP 173~176. 
>  
> Github PR link: https://github.com/apache/flink-ml/pull/21



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24817) Support Naive Bayes algorithm in Flink ML

2021-12-08 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-24817:
---

Assignee: Yunfeng Zhou

> Support Naive Bayes algorithm in Flink ML
> -
>
> Key: FLINK-24817
> URL: https://issues.apache.org/jira/browse/FLINK-24817
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will 
> use latest Flink ML API proposed in FLIP 173~176. 
>  
> Github PR link: https://github.com/apache/flink-ml/pull/21



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-12-08 Thread GitBox


zhipeng93 commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r765480782



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModelData.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.linear;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** Model data of {@link LogisticRegressionModel}. */
+public class LogisticRegressionModelData {
+
+public final DenseVector coefficient;
+
+public LogisticRegressionModelData(DenseVector coefficient) {
+this.coefficient = coefficient;
+}
+
+/**
+ * Converts the table model to a data stream.
+ *
+ * @param modelData The table model data.
+ * @return The data stream model data.
+ */
+public static DataStream 
getModelDataStream(Table modelData) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
modelData).getTableEnvironment();
+return tEnv.toDataStream(modelData).map(x -> 
(LogisticRegressionModelData) x.getField(0));
+}
+
+/** Data encoder for {@link LogisticRegressionModel}. */
+public static class ModelDataEncoder implements 
Encoder {
+
+@Override
+public void encode(LogisticRegressionModelData modelData, OutputStream 
outputStream)
+throws IOException {
+DenseVectorSerializer serializer = new DenseVectorSerializer();
+serializer.serialize(
+modelData.coefficient, new 
DataOutputViewStreamWrapper(outputStream));
+}
+}
+
+/** Data decoder for {@link LogisticRegressionModel}. */
+public static class ModelDataDecoder extends 
SimpleStreamFormat {
+
+@Override
+public Reader createReader(
+Configuration configuration, FSDataInputStream inputStream) {
+return new Reader() {
+
+@Override
+public LogisticRegressionModelData read() throws IOException {
+DenseVectorSerializer serializer = new 
DenseVectorSerializer();

Review comment:
   Thanks for pointing this out. 
   I found that `Serializer` in Flink often made the class member `INSTANCE` 
public, how about we do the same for `DenseVectorSerializer` such that we can 
avoid creating new objects here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] gaoyunhaii closed pull request #32: [FLINK-24817] Add Estimator and Transformer for Naive Bayes

2021-12-08 Thread GitBox


gaoyunhaii closed pull request #32:
URL: https://github.com/apache/flink-ml/pull/32


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18059: [FLINK-25224][filesystem] Bump Hadoop version to 2.8.4

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18059:
URL: https://github.com/apache/flink/pull/18059#issuecomment-989489342


   
   ## CI report:
   
   * 45140747eb4fa27a00662d7d9be106532ea4111f Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27844)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-5373) Extend Unit Tests for StateAssignmentOperation

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5373.
---
Resolution: Information Provided

{{StateAssignmentOperationTest}} had been added.

> Extend Unit Tests for StateAssignmentOperation
> --
>
> Key: FLINK-5373
> URL: https://issues.apache.org/jira/browse/FLINK-5373
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> The legacy savepoint restore end-to-end test uncovered a slight problem with 
> null pointers that is fixed by this commit: 
> https://github.com/apache/flink/commit/74df7631316e78af39a5416e12c1adc8a46d87fe
> We should extend unit tests to catch this case in the future.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #18056:
URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314


   
   ## CI report:
   
   * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-5374) Extend Unit Tests for RegisteredBackendStateMetaInfo

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5374.
---
Resolution: Information Provided

{{RegisteredBackendStateMetaInfo}} had been dropped.

> Extend Unit Tests for RegisteredBackendStateMetaInfo
> 
>
> Key: FLINK-5374
> URL: https://issues.apache.org/jira/browse/FLINK-5374
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> The legacy savepoint restore end-to-end test uncovered a slight problem with 
> the compatibility check of the meta info: 
> https://github.com/apache/flink/commit/d1eaa1ee41728e6d788f1e914cb0568a874a6f32
> We should extend unit tests to catch this case in the future.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5436.
---
Resolution: Information Provided

The API of {{CheckpointedRestoring}} had been dropped.

> UDF state without CheckpointedRestoring can result in restarting loop
> -
>
> Key: FLINK-5436
> URL: https://issues.apache.org/jira/browse/FLINK-5436
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When restoring a job with Checkpointed state and not implementing the new 
> CheckpointedRestoring interface, the job will be restarted over and over 
> again (given the respective restarting strategy).
> Since this is not recoverable, we should immediately fail the job.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765478425



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java
##
@@ -0,0 +1,273 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests Knn and KnnModel. */
+public class KnnTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainData;
+private static final String LABEL_COL = "test_label";
+private static final String PRED_COL = "test_prediction";
+private static final String VEC_COL = "test_features";
+private static final List trainArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(1, Vectors.dense(2.0, 3.0)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(2, Vectors.dense(200.1, 300.1)),
+Row.of(2, Vectors.dense(200.2, 300.2)),
+Row.of(2, Vectors.dense(200.3, 300.3)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.6, 300.6)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(3, Vectors.dense(2.8, 3.2)),
+Row.of(4, Vectors.dense(300., 3.2)),
+Row.of(1, Vectors.dense(2.2, 3.2)),
+Row.of(5, Vectors.dense(2.4, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(1, Vectors.dense(2.1, 3.1;
+
+private static final List testArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, 
Vectors.dense(300, 42;
+private Table testData;
+
+@Before
+public void before() {
+Configuration config = new Configuration();
+
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+env.setParallelism(4);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+tEnv = StreamTableEnvironment.create(env);
+
+Schema schema =
+Schema.newBuilder()
+.column("f0", DataTypes.INT())
+.column("f1", DataTypes.of(DenseVector.class))
+.build();
+
+DataStream dataStream = env.fromCollection(trainArray);
+trainData = tEnv.fromDataStream(dataStream, schema).as(LABEL_COL + "," 
+ VEC_COL);

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-5437) Make CheckpointedRestoring error message more detailed

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5437.
---
Resolution: Information Provided

The API of {{CheckpointedRestoring}} has been dropped.

> Make CheckpointedRestoring error message more detailed
> --
>
> Key: FLINK-5437
> URL: https://issues.apache.org/jira/browse/FLINK-5437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When restoring Checkpointed state without implementing CheckpointedRestoring, 
> the job fails with the following Exception:
> {code}
> java.lang.Exception: Found UDF state but operator is not instance of 
> CheckpointedRestoring
> {code}
> I think we should make this error message more detailed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5439) Adjust max parallelism when migrating

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5439.
---
Resolution: Information Provided

Closing this ticket due to lack of activity and v1 savepoint have been too old.

> Adjust max parallelism when migrating
> -
>
> Key: FLINK-5439
> URL: https://issues.apache.org/jira/browse/FLINK-5439
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When migrating from v1 savepoints which don't have the notion of a max 
> parallelism, the job needs to explicitly set the max parallelism to the 
> parallelism of the savepoint.
> [~stefanrichte...@gmail.com] If this not trivially implemented, let's close 
> this as won't fix.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765476475



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java
##
@@ -0,0 +1,273 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests Knn and KnnModel. */
+public class KnnTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainData;
+private static final String LABEL_COL = "test_label";

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-5440) Misleading error message when migrating and scaling down from savepoint

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5440.
---
Resolution: Information Provided

Closing this ticket due to lack of activity.

> Misleading error message when migrating and scaling down from savepoint
> ---
>
> Key: FLINK-5440
> URL: https://issues.apache.org/jira/browse/FLINK-5440
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When resuming from an 1.1 savepoint with 1.2 and reducing the parallelism 
> (and correctly setting the max parallelism), the error message says something 
> about a missing operator which is misleading. Restoring from the same 
> savepoint with the savepoint parallelism works as expected.
> Instead it should state that this kind of operation is not possible. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression

2021-12-08 Thread GitBox


zhipeng93 commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r765474898



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModelData.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.linear;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** Model data of {@link LogisticRegressionModel}. */
+public class LogisticRegressionModelData {
+
+public final DenseVector coefficient;
+
+public LogisticRegressionModelData(DenseVector coefficient) {
+this.coefficient = coefficient;
+}
+
+/**
+ * Converts the table model to a data stream.
+ *
+ * @param modelData The table model data.
+ * @return The data stream model data.
+ */
+public static DataStream 
getModelDataStream(Table modelData) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
modelData).getTableEnvironment();
+return tEnv.toDataStream(modelData).map(x -> 
(LogisticRegressionModelData) x.getField(0));
+}
+
+/** Data encoder for {@link LogisticRegressionModel}. */
+public static class ModelDataEncoder implements 
Encoder {
+
+@Override
+public void encode(LogisticRegressionModelData modelData, OutputStream 
outputStream)
+throws IOException {
+DenseVectorSerializer serializer = new DenseVectorSerializer();
+serializer.serialize(
+modelData.coefficient, new 
DataOutputViewStreamWrapper(outputStream));
+}
+}
+
+/** Data decoder for {@link LogisticRegressionModel}. */
+public static class ModelDataDecoder extends 
SimpleStreamFormat {

Review comment:
   `KmeansModelData` is already updated. For naivebayes, how about we do 
the update in NaiveBayes PR given that it is not merged yet?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765474398



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java
##
@@ -0,0 +1,273 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests Knn and KnnModel. */
+public class KnnTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainData;
+private static final String LABEL_COL = "test_label";
+private static final String PRED_COL = "test_prediction";
+private static final String VEC_COL = "test_features";
+private static final List trainArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(1, Vectors.dense(2.0, 3.0)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(2, Vectors.dense(200.1, 300.1)),
+Row.of(2, Vectors.dense(200.2, 300.2)),
+Row.of(2, Vectors.dense(200.3, 300.3)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.6, 300.6)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(3, Vectors.dense(2.8, 3.2)),
+Row.of(4, Vectors.dense(300., 3.2)),
+Row.of(1, Vectors.dense(2.2, 3.2)),
+Row.of(5, Vectors.dense(2.4, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(1, Vectors.dense(2.1, 3.1;
+
+private static final List testArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, 
Vectors.dense(300, 42;
+private Table testData;
+
+@Before
+public void before() {
+Configuration config = new Configuration();
+
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+env.setParallelism(4);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+tEnv = StreamTableEnvironment.create(env);
+
+Schema schema =
+Schema.newBuilder()
+.column("f0", DataTypes.INT())
+.column("f1", DataTypes.of(DenseVector.class))
+.build();
+
+DataStream dataStream = env.fromCollection(trainArray);
+trainData = tEnv.fromDataStream(dataStream, schema).as(LABEL_COL + "," 
+ VEC_COL);
+
+DataStream predDataStream = env.fromCollection(testArray);
+testData = tEnv.fromDataStream(predDataStream, schema).as(LABEL_COL + 
"," + VEC_COL);
+}
+
+// Executes the graph and returns a list which has true label and predict 
label.
+private static List> executeAndCollect(Table 
output) throws Exception {

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765472967



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.types.Row;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/** Knn model data, which stores the data used to calculate the distances 
between nodes. */
+public class KnnModelData {
+private final List> dictData;

Review comment:
   OK
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765472869



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.types.Row;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/** Knn model data, which stores the data used to calculate the distances 
between nodes. */
+public class KnnModelData {
+private final List> dictData;
+private final Comparator> comparator;

Review comment:
   this element has been move to PredictOperator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-5707) Find better keys for backend configuration parameters for state backends

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5707.
---
Resolution: Information Provided

Closing this ticket as already done in current Flink.

> Find better keys for backend configuration parameters for state backends
> 
>
> Key: FLINK-5707
> URL: https://issues.apache.org/jira/browse/FLINK-5707
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Priority: Not a Priority
>
> Currently, some config keys for the backends are confusing or even misleading 
> and could be renamed. For example
> `state.backend.fs.checkpointdir` -> `state.backend.checkpoints.dir`
> `state.backend.rocksdb.checkpointdir` -> `state.backend.rocksdb.workdir`
> `state.checkpoints.dir`
> This would reflect their purposes much better.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17892: [FLINK-25038][testutils] Refactor FlinkContainer to split JM and TMs to individual containers and support HA

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17892:
URL: https://github.com/apache/flink/pull/17892#issuecomment-977710431


   
   ## CI report:
   
   * 09e84a1124313bd95488fb85a6bcd5f35f60349b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27841)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765466570



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/** Knn model fitted by estimator. */
+public class KnnModel implements Model, KnnModelParams {
+protected Map, Object> params = new HashMap<>();
+private Table[] modelData;
+
+/** Constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * Sets model data for knn prediction.
+ *
+ * @param modelData Knn model data.
+ * @return Knn model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * Gets model data.
+ *
+ * @return Table array including model data tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * Predicts label with knn model.
+ *
+ * @param inputs List of tables.
+ * @return Prediction result.
+ */
+@Override
+@SuppressWarnings("unchecked")
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+final String broadcastKey = "broadcastModelKey";
+String resultCols = getPredictionCol();
+DataType resultTypes = DataTypes.INT();
+ResolvedSchema outputSchema =
+TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), 
resultCols, resultTypes);
+
+DataStream output =
+BroadcastUtils.withBroadcastStream(
+Collections.singletonList(input),
+Collections.singletonMap(broadcastKey, model),
+inputList -> {
+

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765466093



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.types.Row;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/** Knn model data, which stores the data used to calculate the distances 
between nodes. */
+public class KnnModelData {
+private final List> dictData;
+private final Comparator> comparator;
+
+/**
+ * Constructor.
+ *
+ * @param list Row list.
+ */
+public KnnModelData(List list) {
+this.dictData = new ArrayList<>(list.size());
+for (Row row : list) {
+this.dictData.add(
+Tuple3.of(
+(DenseMatrix) row.getField(0),
+(DenseVector) row.getField(1),
+(int[]) row.getField(2)));
+}
+comparator = Comparator.comparingDouble(o -> -o.f0);
+}
+
+/**
+ * Gets comparator.
+ *
+ * @return Comparator.
+ */
+public Comparator> getQueueComparator() {
+return comparator;
+}
+
+/**
+ * Gets dictionary data size.
+ *
+ * @return Dictionary data size.
+ */
+public Integer getLength() {
+return dictData.size();
+}
+
+public List> getDictData() {

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17977: [FLINK-24661][core][table] ConfigOption add isSecret method to judge sensitive options

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17977:
URL: https://github.com/apache/flink/pull/17977#issuecomment-983482975


   
   ## CI report:
   
   * 942783b56abf07e9d9650b493ab8dc7b040a13d9 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27583)
 
   * 55b743a6f146d58b2388653ec7bf23d5fb37e160 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27848)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765465455



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.types.Row;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/** Knn model data, which stores the data used to calculate the distances 
between nodes. */
+public class KnnModelData {
+private final List> dictData;
+private final Comparator> comparator;
+
+/**
+ * Constructor.
+ *
+ * @param list Row list.
+ */
+public KnnModelData(List list) {
+this.dictData = new ArrayList<>(list.size());
+for (Row row : list) {
+this.dictData.add(
+Tuple3.of(
+(DenseMatrix) row.getField(0),
+(DenseVector) row.getField(1),
+(int[]) row.getField(2)));
+}
+comparator = Comparator.comparingDouble(o -> -o.f0);
+}
+
+/**
+ * Gets comparator.
+ *
+ * @return Comparator.
+ */
+public Comparator> getQueueComparator() {
+return comparator;
+}
+
+/**
+ * Gets dictionary data size.
+ *
+ * @return Dictionary data size.
+ */
+public Integer getLength() {
+return dictData.size();
+}
+
+public List> getDictData() {
+return dictData;
+}
+
+/** Encoder for the Knn model data. */
+public static class ModelDataEncoder implements Encoder {
+@Override
+public void encode(Row modelData, OutputStream outputStream) throws 
IOException {
+DataOutputView dataOutputView = new 
DataOutputViewStreamWrapper(outputStream);
+
+DenseMatrixSerializer matrixSerializer = new 
DenseMatrixSerializer();
+matrixSerializer.serialize((DenseMatrix) modelData.getField(0), 
dataOutputView);
+
+DenseVectorSerializer vectorSerializer = new 
DenseVectorSerializer();
+vectorSerializer.serialize((DenseVector) modelData.getField(1), 
dataOutputView);
+
+int[] label = (int[]) 
Objects.requireNonNull(modelData.getField(2));
+for (Integer integer : label) {
+dataOutputView.writeInt(integer);
+}
+}
+}
+
+/** Decoder for the Knn model data. */
+public static class ModelDataStreamFormat extends SimpleStreamFormat {
+
+@Override
+public Reader createReader(Configuration config, 
FSDataInputStream stream) {
+return new Reader() {
+
+@Override
+public Row read() throws IOException {
+try {
+DataInputView source = new 
DataInputViewStreamWrapper(stream);

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please 

[jira] [Created] (FLINK-25227) Comparing the equality of the same (boxed) numeric values returns false

2021-12-08 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-25227:
---

 Summary: Comparing the equality of the same (boxed) numeric values 
returns false
 Key: FLINK-25227
 URL: https://issues.apache.org/jira/browse/FLINK-25227
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Caizhi Weng
 Fix For: 1.15.0, 1.14.1, 1.13.4


Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.

{code:scala}
@Test
def myTest(): Unit = {
  val data = Seq(
Row.of(
  java.lang.Integer.valueOf(1000),
  java.lang.Integer.valueOf(2000),
  java.lang.Integer.valueOf(1000),
  java.lang.Integer.valueOf(2000))
  )

  tEnv.executeSql(
s"""
   |create table T (
   |  a int,
   |  b int,
   |  c int,
   |  d int
   |) with (
   |  'connector' = 'values',
   |  'bounded' = 'true',
   |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
   |)
   |""".stripMargin)

  tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
}
{code}

The result is false, which is obviously incorrect.

This is caused by the generated java code:
{code:java}
public class StreamExecCalc$8 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {

private final Object[] references;
org.apache.flink.table.data.BoxedWrapperRowData out =
new org.apache.flink.table.data.BoxedWrapperRowData(1);
private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
outElement =
new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

public StreamExecCalc$8(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService)
throws Exception {
this.references = references;

this.setup(task, config, output);
if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
this)
.setProcessingTimeService(processingTimeService);
}
}

@Override
public void open() throws Exception {
super.open();
}

@Override
public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element)
throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();

int field$0;
boolean isNull$0;
int field$1;
boolean isNull$1;
int field$3;
boolean isNull$3;
int field$4;
boolean isNull$4;
boolean isNull$6;
boolean result$7;

isNull$3 = in1.isNullAt(2);
field$3 = -1;
if (!isNull$3) {
field$3 = in1.getInt(2);
}
isNull$0 = in1.isNullAt(0);
field$0 = -1;
if (!isNull$0) {
field$0 = in1.getInt(0);
}
isNull$1 = in1.isNullAt(1);
field$1 = -1;
if (!isNull$1) {
field$1 = in1.getInt(1);
}
isNull$4 = in1.isNullAt(3);
field$4 = -1;
if (!isNull$4) {
field$4 = in1.getInt(3);
}

out.setRowKind(in1.getRowKind());

java.lang.Integer result$2 = field$0;
boolean nullTerm$2 = false;

if (!nullTerm$2) {
java.lang.Integer cur$2 = field$0;
if (isNull$0) {
nullTerm$2 = true;
} else {
int compareResult = result$2.compareTo(cur$2);
if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
result$2 = cur$2;
}
}
}

if (!nullTerm$2) {
java.lang.Integer cur$2 = field$1;
if (isNull$1) {
nullTerm$2 = true;
} else {
int compareResult = result$2.compareTo(cur$2);
if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
result$2 = cur$2;
}
}
}

if (nullTerm$2) {
result$2 = null;
}

java.lang.Integer result$5 = field$3;
boolean nullTerm$5 = false;

if (!nullTerm$5) {
java.lang.Integer cur$5 = field$3;
if (isNull$3) {
nullTerm$5 = true;
} else {
int compareResult = result$5.compareTo(cur$5);
if 

[GitHub] [flink] flinkbot edited a comment on pull request #17977: [FLINK-24661][core][table] ConfigOption add isSecret method to judge sensitive options

2021-12-08 Thread GitBox


flinkbot edited a comment on pull request #17977:
URL: https://github.com/apache/flink/pull/17977#issuecomment-983482975


   
   ## CI report:
   
   * 942783b56abf07e9d9650b493ab8dc7b040a13d9 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27583)
 
   * 55b743a6f146d58b2388653ec7bf23d5fb37e160 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765459948



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java
##
@@ -0,0 +1,273 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests Knn and KnnModel. */
+public class KnnTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainData;
+private static final String LABEL_COL = "test_label";
+private static final String PRED_COL = "test_prediction";
+private static final String VEC_COL = "test_features";
+private static final List trainArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(1, Vectors.dense(2.0, 3.0)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(2, Vectors.dense(200.1, 300.1)),
+Row.of(2, Vectors.dense(200.2, 300.2)),
+Row.of(2, Vectors.dense(200.3, 300.3)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.4, 300.4)),
+Row.of(2, Vectors.dense(200.6, 300.6)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.1, 3.1)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(1, Vectors.dense(2.3, 3.2)),
+Row.of(3, Vectors.dense(2.8, 3.2)),
+Row.of(4, Vectors.dense(300., 3.2)),
+Row.of(1, Vectors.dense(2.2, 3.2)),
+Row.of(5, Vectors.dense(2.4, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(5, Vectors.dense(2.5, 3.2)),
+Row.of(1, Vectors.dense(2.1, 3.1;
+
+private static final List testArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, 
Vectors.dense(300, 42;
+private Table testData;
+
+@Before
+public void before() {
+Configuration config = new Configuration();
+
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+env.setParallelism(4);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+tEnv = StreamTableEnvironment.create(env);
+
+Schema schema =
+Schema.newBuilder()
+.column("f0", DataTypes.INT())
+.column("f1", DataTypes.of(DenseVector.class))
+.build();
+
+DataStream dataStream = env.fromCollection(trainArray);
+trainData = tEnv.fromDataStream(dataStream, schema).as(LABEL_COL + "," 
+ VEC_COL);
+
+DataStream predDataStream = env.fromCollection(testArray);
+testData = tEnv.fromDataStream(predDataStream, schema).as(LABEL_COL + 
"," + VEC_COL);
+}
+
+// Executes the graph and returns a list which has true label and predict 
label.
+private static List> executeAndCollect(Table 
output) throws Exception {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
output).getTableEnvironment();
+
+DataStream> 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765459574



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/** Knn model fitted by estimator. */
+public class KnnModel implements Model, KnnModelParams {
+protected Map, Object> params = new HashMap<>();
+private Table[] modelData;
+
+/** Constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * Sets model data for knn prediction.
+ *
+ * @param modelData Knn model data.
+ * @return Knn model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * Gets model data.
+ *
+ * @return Table array including model data tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * Predicts label with knn model.
+ *
+ * @param inputs List of tables.
+ * @return Prediction result.
+ */
+@Override
+@SuppressWarnings("unchecked")
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+final String broadcastKey = "broadcastModelKey";
+String resultCols = getPredictionCol();
+DataType resultTypes = DataTypes.INT();
+ResolvedSchema outputSchema =
+TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), 
resultCols, resultTypes);
+
+DataStream output =
+BroadcastUtils.withBroadcastStream(
+Collections.singletonList(input),
+Collections.singletonMap(broadcastKey, model),
+inputList -> {
+

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765458856



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/** Knn model fitted by estimator. */
+public class KnnModel implements Model, KnnModelParams {
+protected Map, Object> params = new HashMap<>();
+private Table[] modelData;
+
+/** Constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * Sets model data for knn prediction.
+ *
+ * @param modelData Knn model data.
+ * @return Knn model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * Gets model data.
+ *
+ * @return Table array including model data tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * Predicts label with knn model.
+ *
+ * @param inputs List of tables.
+ * @return Prediction result.
+ */
+@Override
+@SuppressWarnings("unchecked")
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+final String broadcastKey = "broadcastModelKey";
+String resultCols = getPredictionCol();
+DataType resultTypes = DataTypes.INT();
+ResolvedSchema outputSchema =
+TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), 
resultCols, resultTypes);
+
+DataStream output =
+BroadcastUtils.withBroadcastStream(
+Collections.singletonList(input),
+Collections.singletonMap(broadcastKey, model),
+inputList -> {
+

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor

2021-12-08 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r765458336



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/** Knn model fitted by estimator. */
+public class KnnModel implements Model, KnnModelParams {
+protected Map, Object> params = new HashMap<>();
+private Table[] modelData;
+
+/** Constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * Sets model data for knn prediction.
+ *
+ * @param modelData Knn model data.
+ * @return Knn model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * Gets model data.
+ *
+ * @return Table array including model data tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * Predicts label with knn model.
+ *
+ * @param inputs List of tables.
+ * @return Prediction result.
+ */
+@Override
+@SuppressWarnings("unchecked")
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+final String broadcastKey = "broadcastModelKey";
+String resultCols = getPredictionCol();
+DataType resultTypes = DataTypes.INT();
+ResolvedSchema outputSchema =
+TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), 
resultCols, resultTypes);
+
+DataStream output =
+BroadcastUtils.withBroadcastStream(
+Collections.singletonList(input),
+Collections.singletonMap(broadcastKey, model),
+inputList -> {
+

  1   2   3   4   5   6   7   >