[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765507159 ## File path: flink-ml-core/src/main/java/org/apache/flink/ml/linalg/typeinfo/DenseMatrixSerializer.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.linalg.typeinfo; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.ml.linalg.DenseMatrix; + +import java.io.IOException; +import java.util.Arrays; + +/** Specialized serializer for {@link DenseMatrix}. */ +public final class DenseMatrixSerializer extends TypeSerializerSingleton { + +private static final long serialVersionUID = 1L; + +private static final double[] EMPTY = new double[0]; + +private static final DenseMatrixSerializer INSTANCE = new DenseMatrixSerializer(); + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public DenseMatrix createInstance() { +return new DenseMatrix(0, 0, EMPTY); +} + +@Override +public DenseMatrix copy(DenseMatrix from) { +return new DenseMatrix( +from.numRows, from.numCols, Arrays.copyOf(from.values, from.values.length)); +} + +@Override +public DenseMatrix copy(DenseMatrix from, DenseMatrix reuse) { +if (from.values.length == reuse.values.length) { Review comment: I think if size not equal, we just need create a new DenseMatrix and return. If we create a new DenseMatrix, we have to change the reuse. If when next copy time comes, reuse size is right, then we do the wrong thing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a change in pull request #37: [FLINK-24955] Add Estimator and Transformer for One Hot Encoder
lindong28 commented on a change in pull request #37: URL: https://github.com/apache/flink-ml/pull/37#discussion_r765496493 ## File path: flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java ## @@ -95,4 +95,9 @@ public boolean validate(T value) { } }; } + +// Check if the parameter value array is not empty array. +public static ParamValidator nonEmptyArray() { Review comment: Could you test this validator in `StageTest::testValidators(...)`? ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasHandleInvalid.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.StringParam; +import org.apache.flink.ml.param.WithParams; + +/** Interface for the shared handleInvalid param. */ +public interface HasHandleInvalid extends WithParams { +Param HANDLE_INVALID = +new StringParam( +"handleInvalid", +"Strategy to handle invalid entries.", +"ERROR", Review comment: Could we use lower-case letters (i.e. `error`) as the value here for consistency with `HasDistanceMeasure`? ## File path: flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.linalg; + +import org.apache.flink.api.common.typeinfo.TypeInfo; +import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.Objects; + +/** A sparse vector of double values. */ +@TypeInfo(SparseVectorTypeInfoFactory.class) +public class SparseVector implements Vector { +public final int n; +public final int[] indices; +public final double[] values; + +public SparseVector(int n, int[] indices, double[] values) { +this.n = n; +this.indices = indices; +this.values = values; +checkSizeAndIndicesRange(); Review comment: Instead of making three passes over the vector (i.e. `checkSizeAndIndicesRange`, `isIndicesSorted` and `checkDuplicatedIndices`). Could we make just one pass to improve efficiency? ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoder.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-4266) Cassandra SplitOver Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456215#comment-17456215 ] Yun Tang commented on FLINK-4266: - [~foxss] What's the progress of thie ticket, can we close it now? > Cassandra SplitOver Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Affects Versions: 1.3.0 >Reporter: Chen Qin >Assignee: Chen Qin >Priority: Not a Priority > Labels: stale-assigned > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765502926 ## File path: flink-ml-core/src/main/java/org/apache/flink/ml/linalg/typeinfo/DenseMatrixSerializer.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.linalg.typeinfo; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.ml.linalg.DenseMatrix; + +import java.io.IOException; +import java.util.Arrays; + +/** Specialized serializer for {@link DenseMatrix}. */ +public final class DenseMatrixSerializer extends TypeSerializerSingleton { + +private static final long serialVersionUID = 1L; + +private static final double[] EMPTY = new double[0]; + +private static final DenseMatrixSerializer INSTANCE = new DenseMatrixSerializer(); + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public DenseMatrix createInstance() { +return new DenseMatrix(0, 0, EMPTY); +} + +@Override +public DenseMatrix copy(DenseMatrix from) { +return new DenseMatrix( +from.numRows, from.numCols, Arrays.copyOf(from.values, from.values.length)); +} + +@Override +public DenseMatrix copy(DenseMatrix from, DenseMatrix reuse) { +if (from.values.length == reuse.values.length) { +System.arraycopy(from.values, 0, reuse.values, 0, from.values.length); +return reuse; +} +return copy(from); +} + +@Override +public int getLength() { +return -1; +} + +@Override +public void serialize(DenseMatrix matrix, DataOutputView target) throws IOException { +if (matrix == null) { +throw new IllegalArgumentException("The matrix must not be null."); +} +final int len = matrix.values.length; +target.writeInt(matrix.numRows); +target.writeInt(matrix.numCols); +for (int i = 0; i < len; i++) { +target.writeDouble(matrix.values[i]); +} +} + +@Override +public DenseMatrix deserialize(DataInputView source) throws IOException { +int m = source.readInt(); +int n = source.readInt(); +double[] values = new double[m * n]; +deserializeDoubleArray(values, source, m * n); +return new DenseMatrix(m, n, values); +} + +private static void deserializeDoubleArray(double[] dst, DataInputView source, int len) +throws IOException { +for (int i = 0; i < len; i++) { +dst[i] = source.readDouble(); +} +} + +@Override +public DenseMatrix deserialize(DenseMatrix reuse, DataInputView source) throws IOException { +int m = source.readInt(); +int n = source.readInt(); +double[] values = reuse.values; Review comment: I think if size not equal, we need allocate a new double[] for deserialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24489) The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the number of the elements in the cache
[ https://issues.apache.org/jira/browse/FLINK-24489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456213#comment-17456213 ] Arvid Heise commented on FLINK-24489: - Merged into master as 4d142b3a417c57e4ed8af4c886fae68d927da067. I'd say that this is an improvement (and not just a simple bug fix), so we shouldn't backport. WDYT? > The size of entryCache & eventsBufferCache in the SharedBuffer should be > defined with a threshold to limit the number of the elements in the cache > -- > > Key: FLINK-24489 > URL: https://issues.apache.org/jira/browse/FLINK-24489 > Project: Flink > Issue Type: Improvement > Components: Library / CEP >Affects Versions: 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0 >Reporter: Yuepeng Pan >Priority: Major > Labels: pull-request-available > Attachments: details of the big map object.png, incoming-reference-to > the-big-map-obj.png, 截屏2021-10-13 20.28.17.png > > > source code : > [here|https://github.com/apache/flink/blob/c3cb886ee73b5fee23b2bccff0f5e4d45a30b3a1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java#L79] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] AHeise merged pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the number of the elem
AHeise merged pull request #17692: URL: https://github.com/apache/flink/pull/17692 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765500552 ## File path: flink-ml-core/src/main/java/org/apache/flink/ml/linalg/BLAS.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.linalg; + +import org.apache.flink.util.Preconditions; + +/** A utility class that provides BLAS routines over matrices and vectors. */ +public class BLAS { + +private static final dev.ludovic.netlib.BLAS NATIVE_BLAS = +dev.ludovic.netlib.BLAS.getInstance(); + +/** + * \sum_i |x_i| . + * + * @param x x + * @return \sum_i |x_i| + */ +public static double asum(DenseVector x) { +return NATIVE_BLAS.dasum(x.size(), x.values, 0, 1); +} + +/** + * y += a * x . + * + * @param a a + * @param x x + * @param y y + */ +public static void axpy(double a, DenseVector x, DenseVector y) { +Preconditions.checkArgument(x.size() == y.size(), "Array dimension mismatched."); +NATIVE_BLAS.daxpy(x.size(), a, x.values, 1, y.values, 1); +} + +/** + * x \cdot y . + * + * @param x x + * @param y y + * @return x \cdot y + */ +public static double dot(DenseVector x, DenseVector y) { +Preconditions.checkArgument(x.size() == y.size(), "Array dimension mismatched."); +return NATIVE_BLAS.ddot(x.size(), x.values, 1, y.values, 1); +} + +/** + * \sqrt(\sum_i x_i * x_i) . + * + * @param x x + * @return \sqrt(\sum_i x_i * x_i) + */ +public static double norm2(DenseVector x) { +return NATIVE_BLAS.dnrm2(x.size(), x.values, 1); +} + +/** + * x = x * a . + * + * @param a a + * @param x x + */ +public static void scal(double a, DenseVector x) { +NATIVE_BLAS.dscal(x.size(), a, x.values, 1); +} + +/** y := alpha * A * x + beta * y . */ Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-3947) Provide low level access to RocksDB state backend
[ https://issues.apache.org/jira/browse/FLINK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-3947. --- Resolution: Information Provided MapState and keyedStatebackend#getKeys have been implemented. Closing this ticket due to lack of activity. > Provide low level access to RocksDB state backend > - > > Key: FLINK-3947 > URL: https://issues.apache.org/jira/browse/FLINK-3947 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.0.3 >Reporter: Elias Levy >Priority: Minor > Labels: auto-deprioritized-major > > The current state API is limiting and some implementations are not as > efficient as they could be, particularly when working with large states. For > instance, a ListState is append only. You cannot remove values from the > list. And the RocksDBListState get() implementation reads all list values > from RocksDB instead of returning an Iterable that only reads values as > needed. > Furthermore, RocksDB is an ordered KV store, yet there is no ordered map > state API with an ability to iterate over the stored values in order. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #18009: [FLINK-25126][kafka] Reset internal transaction state of FlinkKafkaIn…
AHeise commented on a change in pull request #18009: URL: https://github.com/apache/flink/pull/18009#discussion_r765499014 ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java ## @@ -59,49 +63,83 @@ private static final String TRANSACTION_PREFIX = "test-transaction-"; -Properties getProperties() { -Properties properties = new Properties(); -properties.put( -CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, -KAFKA_CONTAINER.getBootstrapServers()); -properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); -properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -return properties; -} - @Test void testInitTransactionId() { +final String topic = "test-init-transactions"; try (FlinkKafkaInternalProducer reuse = new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) { int numTransactions = 20; for (int i = 1; i <= numTransactions; i++) { reuse.initTransactionId(TRANSACTION_PREFIX + i); reuse.beginTransaction(); -reuse.send(new ProducerRecord<>(TEST_TOPIC, "test-value-" + i)); +reuse.send(new ProducerRecord<>(topic, "test-value-" + i)); if (i % 2 == 0) { reuse.commitTransaction(); } else { reuse.flush(); reuse.abortTransaction(); } assertNumTransactions(i); -assertThat(readRecords(TEST_TOPIC).count(), equalTo(i / 2)); +assertThat(readRecords(topic).count()).isEqualTo(i / 2); +} +} +} + +@ParameterizedTest +@MethodSource("provideTransactionsFinalizer") +void testResetInnerTransactionIfFinalizingTransactionFailed( +Consumer> transactionFinalizer) { +final String topic = "reset-producer-internal-state"; +try (FlinkKafkaInternalProducer fenced = +new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) { +fenced.initTransactions(); +fenced.beginTransaction(); +fenced.send(new ProducerRecord<>(topic, "test-value")); +// Start a second producer that fences the first one +try (FlinkKafkaInternalProducer producer = +new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) { +producer.initTransactions(); +producer.beginTransaction(); +producer.send(new ProducerRecord<>(topic, "test-value")); +producer.commitTransaction(); } +assertThatThrownBy(() -> transactionFinalizer.accept(fenced)) +.isInstanceOf(ProducerFencedException.class) +.hasMessageContaining( +"There is a newer producer with the same transactionalId which fences the current one."); Review comment: `Maybe remove this check`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-4413) Improve savepoint restore error messages
[ https://issues.apache.org/jira/browse/FLINK-4413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-4413. --- Resolution: Information Provided Closing this ticket due to lack of activity. > Improve savepoint restore error messages > > > Key: FLINK-4413 > URL: https://issues.apache.org/jira/browse/FLINK-4413 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Runtime / State Backends >Affects Versions: 1.1.0 >Reporter: Gyula Fora >Priority: Not a Priority > > Currently when savepoint restore fails due to some problems with parallelism > or the assigned uids the error messages contain only the job vertex id of the > problematic task. > This makes these kind of problems very difficult to debug for more complex > topologies. > I propose to add the user assigned task names to these error messages to make > this much easier for users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-4493) Unify the snapshot output format for keyed-state backends
[ https://issues.apache.org/jira/browse/FLINK-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-4493. --- Resolution: Information Provided Already implemented in FLINK-20976. > Unify the snapshot output format for keyed-state backends > - > > Key: FLINK-4493 > URL: https://issues.apache.org/jira/browse/FLINK-4493 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stefan Richter >Priority: Not a Priority > > We could unify the output format for keyed-state backends implementations, > e.g. based on RocksDB and Heap, to write a single, common output format. > For example, this would allow us to restore a state that was previously kept > in RocksDB on a heap-located backend and vice versa. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] AHeise merged pull request #18047: [FLINK-25091][BP-1.12] Change ORC compression attribute reference error in FileSink docs
AHeise merged pull request #18047: URL: https://github.com/apache/flink/pull/18047 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise merged pull request #18046: [FLINK-25091][BP-1.13] Change ORC compression attribute reference error in FileSink docs
AHeise merged pull request #18046: URL: https://github.com/apache/flink/pull/18046 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise merged pull request #18045: [FLINK-25091][BP-1.14] Change ORC compression attribute reference error in FileSink docs
AHeise merged pull request #18045: URL: https://github.com/apache/flink/pull/18045 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuyangzhong commented on a change in pull request #18017: [FLINK-25171] Validation of duplicate fields in derived tables
xuyangzhong commented on a change in pull request #18017: URL: https://github.com/apache/flink/pull/18017#discussion_r765486095 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala ## @@ -40,10 +40,9 @@ class UnionTest extends TableTestBase { |CREATE TABLE t1 ( | id int, | ts bigint, - | name string, + | name varchar(32), Review comment: If you want to delete one of them, I think it's better to keep the one that has the same type with other tables. ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java ## @@ -494,7 +494,13 @@ private void collectPhysicalFieldsTypes(List derivedColumns) { boolean nullable = type.getNullable() == null ? true : type.getNullable(); RelDataType relType = type.deriveType(sqlValidator, nullable); // add field name and field type to physical field list -physicalFieldNamesToTypes.put(name, relType); +RelDataType oldType = physicalFieldNamesToTypes.put(name, relType); +if (oldType != null) { +throw new ValidationException( +String.format( +"A column named '%s' already exists in the derived table.", Review comment: The exception message is confusing where the derived table is, because users maybe only use "create table ..." instead of "create table ... like ...". But the previous one can also cause this bug. IMO, you can delete the word 'derived'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-4916) BufferSpiller should distribute files across temp directories
[ https://issues.apache.org/jira/browse/FLINK-4916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-4916. --- Resolution: Information Provided {{BufferSpiller}} had been dropped. > BufferSpiller should distribute files across temp directories > - > > Key: FLINK-4916 > URL: https://issues.apache.org/jira/browse/FLINK-4916 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Minor > Labels: auto-deprioritized-major > > Currently, the {{BufferSpiller}} puts files into one temp directory. > It should be a simple extension to allow it to rotate files across temp > directories. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-5151) Add discussion about object mutations to heap-based state backend docs.
[ https://issues.apache.org/jira/browse/FLINK-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-5151: --- Assignee: Hangxiang Yu > Add discussion about object mutations to heap-based state backend docs. > --- > > Key: FLINK-5151 > URL: https://issues.apache.org/jira/browse/FLINK-5151 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / State Backends >Affects Versions: 1.1.4, 1.2.0 >Reporter: Fabian Hueske >Assignee: Hangxiang Yu >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.15.0 > > > Flink's heap state backends store data as objects on the heap. Any object > mutations are hence reflected in the state. > This can lead to unexpected behavior. For example, in case of sliding > windows, multiple window hold references to the same object. Hence, all > windows are affected if such an object is modified, e.g., by a > {{WindowFunction}}, {{ReduceFunction}}, or {{FoldFunction}}, and might return > invalid results. > We should add this information to the state backend documentation and also > point out that the RocksDB backend is not affected by this because all data > is serialized. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765495150 ## File path: flink-ml-core/src/main/java/org/apache/flink/ml/linalg/DenseMatrix.java ## @@ -0,0 +1,71 @@ +package org.apache.flink.ml.linalg; + +import org.apache.flink.api.common.typeinfo.TypeInfo; +import org.apache.flink.ml.linalg.typeinfo.DenseMatrixTypeInfoFactory; + +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; + +/** + * Column-major dense matrix. The entry values are stored in a single array of doubles with columns + * listed in sequence. + */ +@TypeInfo(DenseMatrixTypeInfoFactory.class) +public class DenseMatrix implements Matrix { + +/** Row dimension. */ +public final int numRows; + +/** Column dimension. */ +public final int numCols; + +/** + * Array for internal storage of elements. + * + * The matrix data is stored in column major format internally. + */ +public final double[] values; + +/** + * Constructs an m-by-n matrix of zeros. + * + * @param numRows Number of rows. + * @param numCols Number of columns. + */ +public DenseMatrix(int numRows, int numCols) { +this(numRows, numCols, new double[numRows * numCols]); +} + +/** + * Constructs a matrix from a 1-D array. The data in the array should organize in column major. + * + * @param numRows Number of rows. + * @param numCols Number of cols. + * @param values One-dimensional array of doubles. + */ +public DenseMatrix(int numRows, int numCols, double[] values) { +Preconditions.checkArgument(values.length == numRows * numCols); +this.numRows = numRows; +this.numCols = numCols; +this.values = values; +} + +@Override +public int numRows() { +return numRows; +} + +@Override +public int numCols() { +return numCols; +} + +@Override +public double get(int i, int j) { +return values[numRows * j + i]; Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765494989 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java ## @@ -0,0 +1,273 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; + +/** Tests Knn and KnnModel. */ +public class KnnTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainData; +private static final String LABEL_COL = "test_label"; +private static final String PRED_COL = "test_prediction"; +private static final String VEC_COL = "test_features"; +private static final List trainArray = +new ArrayList<>( +Arrays.asList( +Row.of(1, Vectors.dense(2.0, 3.0)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(2, Vectors.dense(200.1, 300.1)), +Row.of(2, Vectors.dense(200.2, 300.2)), +Row.of(2, Vectors.dense(200.3, 300.3)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.6, 300.6)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(3, Vectors.dense(2.8, 3.2)), +Row.of(4, Vectors.dense(300., 3.2)), +Row.of(1, Vectors.dense(2.2, 3.2)), +Row.of(5, Vectors.dense(2.4, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(1, Vectors.dense(2.1, 3.1; + +private static final List testArray = +new ArrayList<>( +Arrays.asList( +Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, Vectors.dense(300, 42; +private Table testData; + +@Before +public void before() { +Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); +env = StreamExecutionEnvironment.getExecutionEnvironment(config); +env.setParallelism(4); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); +tEnv = StreamTableEnvironment.create(env); + +Schema schema = +Schema.newBuilder() +.column("f0", DataTypes.INT()) Review comment: After I discuss with @zhipeng93, I decide using double to be the label type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression
zhipeng93 commented on a change in pull request #28: URL: https://github.com/apache/flink-ml/pull/28#discussion_r765474898 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModelData.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.linear; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; + +/** Model data of {@link LogisticRegressionModel}. */ +public class LogisticRegressionModelData { + +public final DenseVector coefficient; + +public LogisticRegressionModelData(DenseVector coefficient) { +this.coefficient = coefficient; +} + +/** + * Converts the table model to a data stream. + * + * @param modelData The table model data. + * @return The data stream model data. + */ +public static DataStream getModelDataStream(Table modelData) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) modelData).getTableEnvironment(); +return tEnv.toDataStream(modelData).map(x -> (LogisticRegressionModelData) x.getField(0)); +} + +/** Data encoder for {@link LogisticRegressionModel}. */ +public static class ModelDataEncoder implements Encoder { + +@Override +public void encode(LogisticRegressionModelData modelData, OutputStream outputStream) +throws IOException { +DenseVectorSerializer serializer = new DenseVectorSerializer(); +serializer.serialize( +modelData.coefficient, new DataOutputViewStreamWrapper(outputStream)); +} +} + +/** Data decoder for {@link LogisticRegressionModel}. */ +public static class ModelDataDecoder extends SimpleStreamFormat { Review comment: `KmeansModelData` is already updated. For naivebayes, how about we do the update in NaiveBayes PR given that it is not merged yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston edited a comment on pull request #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful
zuston edited a comment on pull request #5982: URL: https://github.com/apache/flink/pull/5982#issuecomment-987845370 Do you mind i take over? @sihuazhou I met the same problems and want to solve it. And any ideas on it @StephanEwen @wangyang0918 @KarmaGYZ @XComp @zentol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-5151) Add discussion about object mutations to heap-based state backend docs.
[ https://issues.apache.org/jira/browse/FLINK-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-5151: Fix Version/s: 1.15.0 > Add discussion about object mutations to heap-based state backend docs. > --- > > Key: FLINK-5151 > URL: https://issues.apache.org/jira/browse/FLINK-5151 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / State Backends >Affects Versions: 1.1.4, 1.2.0 >Reporter: Fabian Hueske >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.15.0 > > > Flink's heap state backends store data as objects on the heap. Any object > mutations are hence reflected in the state. > This can lead to unexpected behavior. For example, in case of sliding > windows, multiple window hold references to the same object. Hence, all > windows are affected if such an object is modified, e.g., by a > {{WindowFunction}}, {{ReduceFunction}}, or {{FoldFunction}}, and might return > invalid results. > We should add this information to the state backend documentation and also > point out that the RocksDB backend is not affected by this because all data > is serialized. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zuston edited a comment on pull request #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful
zuston edited a comment on pull request #5982: URL: https://github.com/apache/flink/pull/5982#issuecomment-987845370 Do you mind i take over? @sihuazhou I met the same problems and want to solve it. And any ideas on it @StephanEwen @wangyang0918 @KarmaGYZ @XComp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston edited a comment on pull request #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful
zuston edited a comment on pull request #5982: URL: https://github.com/apache/flink/pull/5982#issuecomment-987845370 Do you mind i take over? @sihuazhou I met the same problems and want to solve it. And any ideas on it @StephanEwen @wangyang0918 @KarmaGYZ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18060: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
flinkbot edited a comment on pull request #18060: URL: https://github.com/apache/flink/pull/18060#issuecomment-989583760 ## CI report: * ce46fd7f0b44fec9587ab8a8aef30d89a617a1a5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27851) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode
Myasuka commented on a change in pull request #18024: URL: https://github.com/apache/flink/pull/18024#discussion_r765490259 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java ## @@ -46,4 +46,14 @@ .withDescription( "Allow to skip savepoint state that cannot be restored. " + "Allow this if you removed an operator from your pipeline after the savepoint was triggered."); +/** + * Describes the mode how Flink should restore from the given savepoint or retained checkpoint. + */ +public static final ConfigOption RESTORE_MODE = +key("execution.savepoint-restore-mode") +.enumType(RestoreMode.class) +.defaultValue(RestoreMode.NO_CLAIM) +.withDescription( +"Describes the mode how Flink should restore from the given" ++ " savepoint or retained checkpoint."); Review comment: Got it. This sounds more reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18060: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
flinkbot commented on pull request #18060: URL: https://github.com/apache/flink/pull/18060#issuecomment-989583760 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) * 0095d26396c879948f70547077efe70743d4be07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24728) Batch SQL file sink forgets to close the output stream
[ https://issues.apache.org/jira/browse/FLINK-24728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456191#comment-17456191 ] Jingsong Lee commented on FLINK-24728: -- [~TsReaper] Can you cherry-pick for 1.14 and 1.13? Two commits in one PR is OK. > Batch SQL file sink forgets to close the output stream > -- > > Key: FLINK-24728 > URL: https://issues.apache.org/jira/browse/FLINK-24728 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.4, 1.14.0, 1.12.5, 1.13.3 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1, 1.13.4 > > > I tried to write a large avro file into HDFS and discover that the displayed > file size in HDFS is extremely small, but copying that file to local yields > the correct size. If we create another Flink job and read that avro file from > HDFS, the job will finish without outputting any record because the file size > Flink gets from HDFS is the very small file size. > This is because the output format created in > {{FileSystemTableSink#createBulkWriterOutputFormat}} only finishes the > {{BulkWriter}}. According to the java doc of {{BulkWriter#finish}} bulk > writers should not close the output stream and should leave them to the > framework. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] dawidwys commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode
dawidwys commented on a change in pull request #18024: URL: https://github.com/apache/flink/pull/18024#discussion_r765488499 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java ## @@ -46,4 +46,14 @@ .withDescription( "Allow to skip savepoint state that cannot be restored. " + "Allow this if you removed an operator from your pipeline after the savepoint was triggered."); +/** + * Describes the mode how Flink should restore from the given savepoint or retained checkpoint. + */ +public static final ConfigOption RESTORE_MODE = +key("execution.savepoint-restore-mode") +.enumType(RestoreMode.class) +.defaultValue(RestoreMode.NO_CLAIM) +.withDescription( +"Describes the mode how Flink should restore from the given" ++ " savepoint or retained checkpoint."); Review comment: side note: Personally, I am not 100% convinced we should have the `LEGACY` mode in the final version (once we add proper `NO_CLAIM` mode). We might remove it in the end, however I want to add it for now to have the master branch releasable at all times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18042: [WIP][FLINK-25076][table-planner] Improve operator name for sql job
flinkbot edited a comment on pull request #18042: URL: https://github.com/apache/flink/pull/18042#issuecomment-987776252 ## CI report: * 2f62c6d3a186c872eb5ce44ef45ee43c06207a9b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27845) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18017: [FLINK-25171] Validation of duplicate fields in derived tables
flinkbot edited a comment on pull request #18017: URL: https://github.com/apache/flink/pull/18017#issuecomment-986500405 ## CI report: * 196abdbb40a117ea60ad76d7a606d732ed62579c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27847) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi merged pull request #17655: [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files
JingsongLi merged pull request #17655: URL: https://github.com/apache/flink/pull/17655 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil opened a new pull request #18060: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
lincoln-lil opened a new pull request #18060: URL: https://github.com/apache/flink/pull/18060 This is a cherry pick for FLINK-20370 part1. ## Brief change log update the logic of `FlinkChangelogModeInferenceProgram` and `StreamPhysicalSink` ## Verifying this change Streaming sql's `RankTest`, `AggregateTest` and `JoinTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode
dawidwys commented on a change in pull request #18024: URL: https://github.com/apache/flink/pull/18024#discussion_r765487578 ## File path: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java ## @@ -130,6 +131,46 @@ public void testRun() throws Exception { } } +@Test +public void testClaimRestoreModeParsing() throws Exception { +// test configure savepoint with claim mode +String[] parameters = { +"-s", "expectedSavepointPath", "-n", "-r", "claim", getTestJarPath() Review comment: Using `-s expectedSavepointPath -n --legacy` is not necessarily descriptive. It is not straightforward to tell what the `legacy` refers to. This has been also pointed out by @rkhachatryan. Do you have any comments on that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) * 2a3718c93256f66b0cd5392a1c8761eeccdf2f77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17919: [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY
flinkbot edited a comment on pull request #17919: URL: https://github.com/apache/flink/pull/17919#issuecomment-979659835 ## CI report: * 45b754770668f8aa332b5d0d35bc0cbde4cb4706 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27675) * 605c0ab25a4caf70e1c4154632992b1106a5da47 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on a change in pull request #18024: [FLINK-25155] Implement claim snapshot mode
dawidwys commented on a change in pull request #18024: URL: https://github.com/apache/flink/pull/18024#discussion_r765486398 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java ## @@ -46,4 +46,14 @@ .withDescription( "Allow to skip savepoint state that cannot be restored. " + "Allow this if you removed an operator from your pipeline after the savepoint was triggered."); +/** + * Describes the mode how Flink should restore from the given savepoint or retained checkpoint. + */ +public static final ConfigOption RESTORE_MODE = +key("execution.savepoint-restore-mode") +.enumType(RestoreMode.class) +.defaultValue(RestoreMode.NO_CLAIM) +.withDescription( +"Describes the mode how Flink should restore from the given" ++ " savepoint or retained checkpoint."); Review comment: Yes, you are right `LEGACY` is the current behaviour. The other mode `NO_CLAIM` is not part of this PR. It will come in the next PR. > With the name of LEGACY, users would still think the new 1st checkpoint is an incremental one. And they would be right. The `LEGACY` mode is the current behaviour, with no changes at all. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java ## @@ -46,4 +46,14 @@ .withDescription( "Allow to skip savepoint state that cannot be restored. " + "Allow this if you removed an operator from your pipeline after the savepoint was triggered."); +/** + * Describes the mode how Flink should restore from the given savepoint or retained checkpoint. + */ +public static final ConfigOption RESTORE_MODE = +key("execution.savepoint-restore-mode") +.enumType(RestoreMode.class) +.defaultValue(RestoreMode.NO_CLAIM) +.withDescription( +"Describes the mode how Flink should restore from the given" ++ " savepoint or retained checkpoint."); Review comment: Yes, you are right `LEGACY` is the current behaviour. The other mode `NO_CLAIM` is not part of this PR. It will come in the next PR. > With the name of LEGACY, users would still think the new 1st checkpoint is an incremental one. And they would be right. The `LEGACY` mode is the current behaviour, with no changes at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25218) Performance issues with lookup join accessing external dimension tables
[ https://issues.apache.org/jira/browse/FLINK-25218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456188#comment-17456188 ] Ada Wong commented on FLINK-25218: -- We could import guava cache to cache the same primary key rows. > Performance issues with lookup join accessing external dimension tables > > > Key: FLINK-25218 > URL: https://issues.apache.org/jira/browse/FLINK-25218 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.15.0 >Reporter: ZhuoYu Chen >Priority: Major > > Current lookup join: for each input data, access the external dimension table > to get the result and output a data > Implement a lookup join that can improve performance by batching and delaying -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] gaoyunhaii commented on pull request #20: [FLINK-23959][FLIP-175] Compose Estimator/Model/AlgoOperator from DAG of Estimator/Model/AlgoOperator
gaoyunhaii commented on pull request #20: URL: https://github.com/apache/flink-ml/pull/20#issuecomment-989576488 @lindong28 Hello~ could you rebase the PR to the latest master~? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24817) Support Naive Bayes algorithm in Flink ML
[ https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456183#comment-17456183 ] Yun Gao commented on FLINK-24817: - Merge on master via e4f7eedb958e29817525f54ee6944d819d0de47c > Support Naive Bayes algorithm in Flink ML > - > > Key: FLINK-24817 > URL: https://issues.apache.org/jira/browse/FLINK-24817 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > Fix For: 0.1.0 > > > This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will > use latest Flink ML API proposed in FLIP 173~176. > > Github PR link: https://github.com/apache/flink-ml/pull/21 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-24817) Support Naive Bayes algorithm in Flink ML
[ https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao closed FLINK-24817. --- Resolution: Fixed > Support Naive Bayes algorithm in Flink ML > - > > Key: FLINK-24817 > URL: https://issues.apache.org/jira/browse/FLINK-24817 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > Fix For: 0.1.0 > > > This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will > use latest Flink ML API proposed in FLIP 173~176. > > Github PR link: https://github.com/apache/flink-ml/pull/21 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) * f21e8c71b1582b77beaf4a0393a51d101b9b80ab UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.keyed-shuffle' option to auto keyby on sink's pk if parallelism are not
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r765481583 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -280,26 +297,29 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +final ExecutionConfigOptions.SinkKeyedShuffle sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && sinkParallelism != inputParallelism; +break; +case FORCE: +// single parallelism has no problem +sinkKeyBy = sinkParallelism != 1 || inputParallelism != 1; Review comment: I was torn about this condition because I originally thought we shouldn't specialize the single parallelism case. But considering that there are a lot of single parallelism jobs in stream computing, it is worth optimizing to remove keyBy for single parallelism. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24817) Support Naive Bayes algorithm in Flink ML
[ https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-24817: Fix Version/s: 0.1.0 > Support Naive Bayes algorithm in Flink ML > - > > Key: FLINK-24817 > URL: https://issues.apache.org/jira/browse/FLINK-24817 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > Fix For: 0.1.0 > > > This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will > use latest Flink ML API proposed in FLIP 173~176. > > Github PR link: https://github.com/apache/flink-ml/pull/21 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-24817) Support Naive Bayes algorithm in Flink ML
[ https://issues.apache.org/jira/browse/FLINK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao reassigned FLINK-24817: --- Assignee: Yunfeng Zhou > Support Naive Bayes algorithm in Flink ML > - > > Key: FLINK-24817 > URL: https://issues.apache.org/jira/browse/FLINK-24817 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will > use latest Flink ML API proposed in FLIP 173~176. > > Github PR link: https://github.com/apache/flink-ml/pull/21 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression
zhipeng93 commented on a change in pull request #28: URL: https://github.com/apache/flink-ml/pull/28#discussion_r765480782 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModelData.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.linear; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; + +/** Model data of {@link LogisticRegressionModel}. */ +public class LogisticRegressionModelData { + +public final DenseVector coefficient; + +public LogisticRegressionModelData(DenseVector coefficient) { +this.coefficient = coefficient; +} + +/** + * Converts the table model to a data stream. + * + * @param modelData The table model data. + * @return The data stream model data. + */ +public static DataStream getModelDataStream(Table modelData) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) modelData).getTableEnvironment(); +return tEnv.toDataStream(modelData).map(x -> (LogisticRegressionModelData) x.getField(0)); +} + +/** Data encoder for {@link LogisticRegressionModel}. */ +public static class ModelDataEncoder implements Encoder { + +@Override +public void encode(LogisticRegressionModelData modelData, OutputStream outputStream) +throws IOException { +DenseVectorSerializer serializer = new DenseVectorSerializer(); +serializer.serialize( +modelData.coefficient, new DataOutputViewStreamWrapper(outputStream)); +} +} + +/** Data decoder for {@link LogisticRegressionModel}. */ +public static class ModelDataDecoder extends SimpleStreamFormat { + +@Override +public Reader createReader( +Configuration configuration, FSDataInputStream inputStream) { +return new Reader() { + +@Override +public LogisticRegressionModelData read() throws IOException { +DenseVectorSerializer serializer = new DenseVectorSerializer(); Review comment: Thanks for pointing this out. I found that `Serializer` in Flink often made the class member `INSTANCE` public, how about we do the same for `DenseVectorSerializer` such that we can avoid creating new objects here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] gaoyunhaii closed pull request #32: [FLINK-24817] Add Estimator and Transformer for Naive Bayes
gaoyunhaii closed pull request #32: URL: https://github.com/apache/flink-ml/pull/32 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18059: [FLINK-25224][filesystem] Bump Hadoop version to 2.8.4
flinkbot edited a comment on pull request #18059: URL: https://github.com/apache/flink/pull/18059#issuecomment-989489342 ## CI report: * 45140747eb4fa27a00662d7d9be106532ea4111f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27844) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-5373) Extend Unit Tests for StateAssignmentOperation
[ https://issues.apache.org/jira/browse/FLINK-5373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5373. --- Resolution: Information Provided {{StateAssignmentOperationTest}} had been added. > Extend Unit Tests for StateAssignmentOperation > -- > > Key: FLINK-5373 > URL: https://issues.apache.org/jira/browse/FLINK-5373 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Aljoscha Krettek >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > The legacy savepoint restore end-to-end test uncovered a slight problem with > null pointers that is fixed by this commit: > https://github.com/apache/flink/commit/74df7631316e78af39a5416e12c1adc8a46d87fe > We should extend unit tests to catch this case in the future. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18056: [FLINK-25074][streaming] Simplify name of WindowOperator
flinkbot edited a comment on pull request #18056: URL: https://github.com/apache/flink/pull/18056#issuecomment-988842314 ## CI report: * ee1249a75021e8b2bad5faf5641ebac0ca4f742d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27846) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-5374) Extend Unit Tests for RegisteredBackendStateMetaInfo
[ https://issues.apache.org/jira/browse/FLINK-5374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5374. --- Resolution: Information Provided {{RegisteredBackendStateMetaInfo}} had been dropped. > Extend Unit Tests for RegisteredBackendStateMetaInfo > > > Key: FLINK-5374 > URL: https://issues.apache.org/jira/browse/FLINK-5374 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Aljoscha Krettek >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > The legacy savepoint restore end-to-end test uncovered a slight problem with > the compatibility check of the meta info: > https://github.com/apache/flink/commit/d1eaa1ee41728e6d788f1e914cb0568a874a6f32 > We should extend unit tests to catch this case in the future. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop
[ https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5436. --- Resolution: Information Provided The API of {{CheckpointedRestoring}} had been dropped. > UDF state without CheckpointedRestoring can result in restarting loop > - > > Key: FLINK-5436 > URL: https://issues.apache.org/jira/browse/FLINK-5436 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ufuk Celebi >Priority: Not a Priority > > When restoring a job with Checkpointed state and not implementing the new > CheckpointedRestoring interface, the job will be restarted over and over > again (given the respective restarting strategy). > Since this is not recoverable, we should immediately fail the job. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765478425 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java ## @@ -0,0 +1,273 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; + +/** Tests Knn and KnnModel. */ +public class KnnTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainData; +private static final String LABEL_COL = "test_label"; +private static final String PRED_COL = "test_prediction"; +private static final String VEC_COL = "test_features"; +private static final List trainArray = +new ArrayList<>( +Arrays.asList( +Row.of(1, Vectors.dense(2.0, 3.0)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(2, Vectors.dense(200.1, 300.1)), +Row.of(2, Vectors.dense(200.2, 300.2)), +Row.of(2, Vectors.dense(200.3, 300.3)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.6, 300.6)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(3, Vectors.dense(2.8, 3.2)), +Row.of(4, Vectors.dense(300., 3.2)), +Row.of(1, Vectors.dense(2.2, 3.2)), +Row.of(5, Vectors.dense(2.4, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(1, Vectors.dense(2.1, 3.1; + +private static final List testArray = +new ArrayList<>( +Arrays.asList( +Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, Vectors.dense(300, 42; +private Table testData; + +@Before +public void before() { +Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); +env = StreamExecutionEnvironment.getExecutionEnvironment(config); +env.setParallelism(4); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); +tEnv = StreamTableEnvironment.create(env); + +Schema schema = +Schema.newBuilder() +.column("f0", DataTypes.INT()) +.column("f1", DataTypes.of(DenseVector.class)) +.build(); + +DataStream dataStream = env.fromCollection(trainArray); +trainData = tEnv.fromDataStream(dataStream, schema).as(LABEL_COL + "," + VEC_COL); Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-5437) Make CheckpointedRestoring error message more detailed
[ https://issues.apache.org/jira/browse/FLINK-5437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5437. --- Resolution: Information Provided The API of {{CheckpointedRestoring}} has been dropped. > Make CheckpointedRestoring error message more detailed > -- > > Key: FLINK-5437 > URL: https://issues.apache.org/jira/browse/FLINK-5437 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ufuk Celebi >Priority: Not a Priority > > When restoring Checkpointed state without implementing CheckpointedRestoring, > the job fails with the following Exception: > {code} > java.lang.Exception: Found UDF state but operator is not instance of > CheckpointedRestoring > {code} > I think we should make this error message more detailed. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-5439) Adjust max parallelism when migrating
[ https://issues.apache.org/jira/browse/FLINK-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5439. --- Resolution: Information Provided Closing this ticket due to lack of activity and v1 savepoint have been too old. > Adjust max parallelism when migrating > - > > Key: FLINK-5439 > URL: https://issues.apache.org/jira/browse/FLINK-5439 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ufuk Celebi >Priority: Not a Priority > > When migrating from v1 savepoints which don't have the notion of a max > parallelism, the job needs to explicitly set the max parallelism to the > parallelism of the savepoint. > [~stefanrichte...@gmail.com] If this not trivially implemented, let's close > this as won't fix. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765476475 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java ## @@ -0,0 +1,273 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; + +/** Tests Knn and KnnModel. */ +public class KnnTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainData; +private static final String LABEL_COL = "test_label"; Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-5440) Misleading error message when migrating and scaling down from savepoint
[ https://issues.apache.org/jira/browse/FLINK-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5440. --- Resolution: Information Provided Closing this ticket due to lack of activity. > Misleading error message when migrating and scaling down from savepoint > --- > > Key: FLINK-5440 > URL: https://issues.apache.org/jira/browse/FLINK-5440 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ufuk Celebi >Priority: Not a Priority > > When resuming from an 1.1 savepoint with 1.2 and reducing the parallelism > (and correctly setting the max parallelism), the error message says something > about a missing operator which is misleading. Restoring from the same > savepoint with the savepoint parallelism works as expected. > Instead it should state that this kind of operation is not possible. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] zhipeng93 commented on a change in pull request #28: [Flink-24556] Add Estimator and Transformer for logistic regression
zhipeng93 commented on a change in pull request #28: URL: https://github.com/apache/flink-ml/pull/28#discussion_r765474898 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/linear/LogisticRegressionModelData.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.linear; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; + +/** Model data of {@link LogisticRegressionModel}. */ +public class LogisticRegressionModelData { + +public final DenseVector coefficient; + +public LogisticRegressionModelData(DenseVector coefficient) { +this.coefficient = coefficient; +} + +/** + * Converts the table model to a data stream. + * + * @param modelData The table model data. + * @return The data stream model data. + */ +public static DataStream getModelDataStream(Table modelData) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) modelData).getTableEnvironment(); +return tEnv.toDataStream(modelData).map(x -> (LogisticRegressionModelData) x.getField(0)); +} + +/** Data encoder for {@link LogisticRegressionModel}. */ +public static class ModelDataEncoder implements Encoder { + +@Override +public void encode(LogisticRegressionModelData modelData, OutputStream outputStream) +throws IOException { +DenseVectorSerializer serializer = new DenseVectorSerializer(); +serializer.serialize( +modelData.coefficient, new DataOutputViewStreamWrapper(outputStream)); +} +} + +/** Data decoder for {@link LogisticRegressionModel}. */ +public static class ModelDataDecoder extends SimpleStreamFormat { Review comment: `KmeansModelData` is already updated. For naivebayes, how about we do the update in NaiveBayes PR given that it is not merged yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765474398 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java ## @@ -0,0 +1,273 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; + +/** Tests Knn and KnnModel. */ +public class KnnTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainData; +private static final String LABEL_COL = "test_label"; +private static final String PRED_COL = "test_prediction"; +private static final String VEC_COL = "test_features"; +private static final List trainArray = +new ArrayList<>( +Arrays.asList( +Row.of(1, Vectors.dense(2.0, 3.0)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(2, Vectors.dense(200.1, 300.1)), +Row.of(2, Vectors.dense(200.2, 300.2)), +Row.of(2, Vectors.dense(200.3, 300.3)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.6, 300.6)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(3, Vectors.dense(2.8, 3.2)), +Row.of(4, Vectors.dense(300., 3.2)), +Row.of(1, Vectors.dense(2.2, 3.2)), +Row.of(5, Vectors.dense(2.4, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(1, Vectors.dense(2.1, 3.1; + +private static final List testArray = +new ArrayList<>( +Arrays.asList( +Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, Vectors.dense(300, 42; +private Table testData; + +@Before +public void before() { +Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); +env = StreamExecutionEnvironment.getExecutionEnvironment(config); +env.setParallelism(4); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); +tEnv = StreamTableEnvironment.create(env); + +Schema schema = +Schema.newBuilder() +.column("f0", DataTypes.INT()) +.column("f1", DataTypes.of(DenseVector.class)) +.build(); + +DataStream dataStream = env.fromCollection(trainArray); +trainData = tEnv.fromDataStream(dataStream, schema).as(LABEL_COL + "," + VEC_COL); + +DataStream predDataStream = env.fromCollection(testArray); +testData = tEnv.fromDataStream(predDataStream, schema).as(LABEL_COL + "," + VEC_COL); +} + +// Executes the graph and returns a list which has true label and predict label. +private static List> executeAndCollect(Table output) throws Exception { Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765472967 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.types.Row; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** Knn model data, which stores the data used to calculate the distances between nodes. */ +public class KnnModelData { +private final List> dictData; Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765472869 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.types.Row; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** Knn model data, which stores the data used to calculate the distances between nodes. */ +public class KnnModelData { +private final List> dictData; +private final Comparator> comparator; Review comment: this element has been move to PredictOperator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-5707) Find better keys for backend configuration parameters for state backends
[ https://issues.apache.org/jira/browse/FLINK-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-5707. --- Resolution: Information Provided Closing this ticket as already done in current Flink. > Find better keys for backend configuration parameters for state backends > > > Key: FLINK-5707 > URL: https://issues.apache.org/jira/browse/FLINK-5707 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Priority: Not a Priority > > Currently, some config keys for the backends are confusing or even misleading > and could be renamed. For example > `state.backend.fs.checkpointdir` -> `state.backend.checkpoints.dir` > `state.backend.rocksdb.checkpointdir` -> `state.backend.rocksdb.workdir` > `state.checkpoints.dir` > This would reflect their purposes much better. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17892: [FLINK-25038][testutils] Refactor FlinkContainer to split JM and TMs to individual containers and support HA
flinkbot edited a comment on pull request #17892: URL: https://github.com/apache/flink/pull/17892#issuecomment-977710431 ## CI report: * 09e84a1124313bd95488fb85a6bcd5f35f60349b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27841) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765466570 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; + +/** Knn model fitted by estimator. */ +public class KnnModel implements Model, KnnModelParams { +protected Map, Object> params = new HashMap<>(); +private Table[] modelData; + +/** Constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * Sets model data for knn prediction. + * + * @param modelData Knn model data. + * @return Knn model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * Gets model data. + * + * @return Table array including model data tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * Predicts label with knn model. + * + * @param inputs List of tables. + * @return Prediction result. + */ +@Override +@SuppressWarnings("unchecked") +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); +final String broadcastKey = "broadcastModelKey"; +String resultCols = getPredictionCol(); +DataType resultTypes = DataTypes.INT(); +ResolvedSchema outputSchema = +TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), resultCols, resultTypes); + +DataStream output = +BroadcastUtils.withBroadcastStream( +Collections.singletonList(input), +Collections.singletonMap(broadcastKey, model), +inputList -> { +
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765466093 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.types.Row; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** Knn model data, which stores the data used to calculate the distances between nodes. */ +public class KnnModelData { +private final List> dictData; +private final Comparator> comparator; + +/** + * Constructor. + * + * @param list Row list. + */ +public KnnModelData(List list) { +this.dictData = new ArrayList<>(list.size()); +for (Row row : list) { +this.dictData.add( +Tuple3.of( +(DenseMatrix) row.getField(0), +(DenseVector) row.getField(1), +(int[]) row.getField(2))); +} +comparator = Comparator.comparingDouble(o -> -o.f0); +} + +/** + * Gets comparator. + * + * @return Comparator. + */ +public Comparator> getQueueComparator() { +return comparator; +} + +/** + * Gets dictionary data size. + * + * @return Dictionary data size. + */ +public Integer getLength() { +return dictData.size(); +} + +public List> getDictData() { Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17977: [FLINK-24661][core][table] ConfigOption add isSecret method to judge sensitive options
flinkbot edited a comment on pull request #17977: URL: https://github.com/apache/flink/pull/17977#issuecomment-983482975 ## CI report: * 942783b56abf07e9d9650b493ab8dc7b040a13d9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27583) * 55b743a6f146d58b2388653ec7bf23d5fb37e160 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27848) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765465455 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModelData.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.typeinfo.DenseMatrixSerializer; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.types.Row; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** Knn model data, which stores the data used to calculate the distances between nodes. */ +public class KnnModelData { +private final List> dictData; +private final Comparator> comparator; + +/** + * Constructor. + * + * @param list Row list. + */ +public KnnModelData(List list) { +this.dictData = new ArrayList<>(list.size()); +for (Row row : list) { +this.dictData.add( +Tuple3.of( +(DenseMatrix) row.getField(0), +(DenseVector) row.getField(1), +(int[]) row.getField(2))); +} +comparator = Comparator.comparingDouble(o -> -o.f0); +} + +/** + * Gets comparator. + * + * @return Comparator. + */ +public Comparator> getQueueComparator() { +return comparator; +} + +/** + * Gets dictionary data size. + * + * @return Dictionary data size. + */ +public Integer getLength() { +return dictData.size(); +} + +public List> getDictData() { +return dictData; +} + +/** Encoder for the Knn model data. */ +public static class ModelDataEncoder implements Encoder { +@Override +public void encode(Row modelData, OutputStream outputStream) throws IOException { +DataOutputView dataOutputView = new DataOutputViewStreamWrapper(outputStream); + +DenseMatrixSerializer matrixSerializer = new DenseMatrixSerializer(); +matrixSerializer.serialize((DenseMatrix) modelData.getField(0), dataOutputView); + +DenseVectorSerializer vectorSerializer = new DenseVectorSerializer(); +vectorSerializer.serialize((DenseVector) modelData.getField(1), dataOutputView); + +int[] label = (int[]) Objects.requireNonNull(modelData.getField(2)); +for (Integer integer : label) { +dataOutputView.writeInt(integer); +} +} +} + +/** Decoder for the Knn model data. */ +public static class ModelDataStreamFormat extends SimpleStreamFormat { + +@Override +public Reader createReader(Configuration config, FSDataInputStream stream) { +return new Reader() { + +@Override +public Row read() throws IOException { +try { +DataInputView source = new DataInputViewStreamWrapper(stream); Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please
[jira] [Created] (FLINK-25227) Comparing the equality of the same (boxed) numeric values returns false
Caizhi Weng created FLINK-25227: --- Summary: Comparing the equality of the same (boxed) numeric values returns false Key: FLINK-25227 URL: https://issues.apache.org/jira/browse/FLINK-25227 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Caizhi Weng Fix For: 1.15.0, 1.14.1, 1.13.4 Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug. {code:scala} @Test def myTest(): Unit = { val data = Seq( Row.of( java.lang.Integer.valueOf(1000), java.lang.Integer.valueOf(2000), java.lang.Integer.valueOf(1000), java.lang.Integer.valueOf(2000)) ) tEnv.executeSql( s""" |create table T ( | a int, | b int, | c int, | d int |) with ( | 'connector' = 'values', | 'bounded' = 'true', | 'data-id' = '${TestValuesTableFactory.registerData(data)}' |) |""".stripMargin) tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print() } {code} The result is false, which is obviously incorrect. This is caused by the generated java code: {code:java} public class StreamExecCalc$8 extends org.apache.flink.table.runtime.operators.TableStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(1); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public StreamExecCalc$8( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { this.references = references; this.setup(task, config, output); if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) .setProcessingTimeService(processingTimeService); } } @Override public void open() throws Exception { super.open(); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); int field$0; boolean isNull$0; int field$1; boolean isNull$1; int field$3; boolean isNull$3; int field$4; boolean isNull$4; boolean isNull$6; boolean result$7; isNull$3 = in1.isNullAt(2); field$3 = -1; if (!isNull$3) { field$3 = in1.getInt(2); } isNull$0 = in1.isNullAt(0); field$0 = -1; if (!isNull$0) { field$0 = in1.getInt(0); } isNull$1 = in1.isNullAt(1); field$1 = -1; if (!isNull$1) { field$1 = in1.getInt(1); } isNull$4 = in1.isNullAt(3); field$4 = -1; if (!isNull$4) { field$4 = in1.getInt(3); } out.setRowKind(in1.getRowKind()); java.lang.Integer result$2 = field$0; boolean nullTerm$2 = false; if (!nullTerm$2) { java.lang.Integer cur$2 = field$0; if (isNull$0) { nullTerm$2 = true; } else { int compareResult = result$2.compareTo(cur$2); if ((true && compareResult < 0) || (compareResult > 0 && !true)) { result$2 = cur$2; } } } if (!nullTerm$2) { java.lang.Integer cur$2 = field$1; if (isNull$1) { nullTerm$2 = true; } else { int compareResult = result$2.compareTo(cur$2); if ((true && compareResult < 0) || (compareResult > 0 && !true)) { result$2 = cur$2; } } } if (nullTerm$2) { result$2 = null; } java.lang.Integer result$5 = field$3; boolean nullTerm$5 = false; if (!nullTerm$5) { java.lang.Integer cur$5 = field$3; if (isNull$3) { nullTerm$5 = true; } else { int compareResult = result$5.compareTo(cur$5); if
[GitHub] [flink] flinkbot edited a comment on pull request #17977: [FLINK-24661][core][table] ConfigOption add isSecret method to judge sensitive options
flinkbot edited a comment on pull request #17977: URL: https://github.com/apache/flink/pull/17977#issuecomment-983482975 ## CI report: * 942783b56abf07e9d9650b493ab8dc7b040a13d9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27583) * 55b743a6f146d58b2388653ec7bf23d5fb37e160 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765459948 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java ## @@ -0,0 +1,273 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; + +/** Tests Knn and KnnModel. */ +public class KnnTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainData; +private static final String LABEL_COL = "test_label"; +private static final String PRED_COL = "test_prediction"; +private static final String VEC_COL = "test_features"; +private static final List trainArray = +new ArrayList<>( +Arrays.asList( +Row.of(1, Vectors.dense(2.0, 3.0)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(2, Vectors.dense(200.1, 300.1)), +Row.of(2, Vectors.dense(200.2, 300.2)), +Row.of(2, Vectors.dense(200.3, 300.3)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.4, 300.4)), +Row.of(2, Vectors.dense(200.6, 300.6)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.1, 3.1)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(1, Vectors.dense(2.3, 3.2)), +Row.of(3, Vectors.dense(2.8, 3.2)), +Row.of(4, Vectors.dense(300., 3.2)), +Row.of(1, Vectors.dense(2.2, 3.2)), +Row.of(5, Vectors.dense(2.4, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(5, Vectors.dense(2.5, 3.2)), +Row.of(1, Vectors.dense(2.1, 3.1; + +private static final List testArray = +new ArrayList<>( +Arrays.asList( +Row.of(5, Vectors.dense(4.0, 4.1)), Row.of(2, Vectors.dense(300, 42; +private Table testData; + +@Before +public void before() { +Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); +env = StreamExecutionEnvironment.getExecutionEnvironment(config); +env.setParallelism(4); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); +tEnv = StreamTableEnvironment.create(env); + +Schema schema = +Schema.newBuilder() +.column("f0", DataTypes.INT()) +.column("f1", DataTypes.of(DenseVector.class)) +.build(); + +DataStream dataStream = env.fromCollection(trainArray); +trainData = tEnv.fromDataStream(dataStream, schema).as(LABEL_COL + "," + VEC_COL); + +DataStream predDataStream = env.fromCollection(testArray); +testData = tEnv.fromDataStream(predDataStream, schema).as(LABEL_COL + "," + VEC_COL); +} + +// Executes the graph and returns a list which has true label and predict label. +private static List> executeAndCollect(Table output) throws Exception { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) output).getTableEnvironment(); + +DataStream>
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765459574 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; + +/** Knn model fitted by estimator. */ +public class KnnModel implements Model, KnnModelParams { +protected Map, Object> params = new HashMap<>(); +private Table[] modelData; + +/** Constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * Sets model data for knn prediction. + * + * @param modelData Knn model data. + * @return Knn model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * Gets model data. + * + * @return Table array including model data tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * Predicts label with knn model. + * + * @param inputs List of tables. + * @return Prediction result. + */ +@Override +@SuppressWarnings("unchecked") +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); +final String broadcastKey = "broadcastModelKey"; +String resultCols = getPredictionCol(); +DataType resultTypes = DataTypes.INT(); +ResolvedSchema outputSchema = +TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), resultCols, resultTypes); + +DataStream output = +BroadcastUtils.withBroadcastStream( +Collections.singletonList(input), +Collections.singletonMap(broadcastKey, model), +inputList -> { +
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765458856 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; + +/** Knn model fitted by estimator. */ +public class KnnModel implements Model, KnnModelParams { +protected Map, Object> params = new HashMap<>(); +private Table[] modelData; + +/** Constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * Sets model data for knn prediction. + * + * @param modelData Knn model data. + * @return Knn model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * Gets model data. + * + * @return Table array including model data tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * Predicts label with knn model. + * + * @param inputs List of tables. + * @return Prediction result. + */ +@Override +@SuppressWarnings("unchecked") +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); +final String broadcastKey = "broadcastModelKey"; +String resultCols = getPredictionCol(); +DataType resultTypes = DataTypes.INT(); +ResolvedSchema outputSchema = +TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), resultCols, resultTypes); + +DataStream output = +BroadcastUtils.withBroadcastStream( +Collections.singletonList(input), +Collections.singletonMap(broadcastKey, model), +inputList -> { +
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add Estimator and Transformer for K-nearest neighbor
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r765458336 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; + +/** Knn model fitted by estimator. */ +public class KnnModel implements Model, KnnModelParams { +protected Map, Object> params = new HashMap<>(); +private Table[] modelData; + +/** Constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * Sets model data for knn prediction. + * + * @param modelData Knn model data. + * @return Knn model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * Gets model data. + * + * @return Table array including model data tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * Predicts label with knn model. + * + * @param inputs List of tables. + * @return Prediction result. + */ +@Override +@SuppressWarnings("unchecked") +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); +final String broadcastKey = "broadcastModelKey"; +String resultCols = getPredictionCol(); +DataType resultTypes = DataTypes.INT(); +ResolvedSchema outputSchema = +TableUtils.getOutputSchema(inputs[0].getResolvedSchema(), resultCols, resultTypes); + +DataStream output = +BroadcastUtils.withBroadcastStream( +Collections.singletonList(input), +Collections.singletonMap(broadcastKey, model), +inputList -> { +