[GitHub] [flink] dannycranmer commented on a change in pull request #17785: [FLINK-24580][Connectors/Kinesis] Make ConnectTimeoutException recoverable

2021-11-23 Thread GitBox


dannycranmer commented on a change in pull request #17785:
URL: https://github.com/apache/flink/pull/17785#discussion_r755780760



##
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##
@@ -420,6 +421,8 @@ private String getShardIterator(GetShardIteratorRequest 
getShardIteratorRequest)
 protected boolean isRecoverableSdkClientException(SdkClientException ex) {
 if (ex instanceof AmazonServiceException) {
 return 
KinesisProxy.isRecoverableException((AmazonServiceException) ex);
+} else if (ex.getCause() instanceof ConnectTimeoutException) {

Review comment:
   Yes I agree, either approach is fine. I will wait for an update and 
merge once tests pass




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise merged pull request #17848: [FLINK-24583] [connectors/elasticsearch] Improve test stability by blocking until ES has acknowledged all records

2021-11-23 Thread GitBox


AHeise merged pull request #17848:
URL: https://github.com/apache/flink/pull/17848


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24592) FlinkSQL multiline parser improvements

2021-11-23 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448419#comment-17448419
 ] 

Sergey Nuyanzin commented on FLINK-24592:
-

[~jark] thanks for the assignment
could you or anyone from the watchers please help with review?

> FlinkSQL multiline parser improvements
> --
>
> Key: FLINK-24592
> URL: https://issues.apache.org/jira/browse/FLINK-24592
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently existing multiline parser has limitations e.g.
> line could not end with semicolon e.g. as a part of field value, comment or 
> column name.
> Also if a query contains '--' e.g. as a part of varchar field value then it 
> fails.
> In case there is no objections I would put some efforts to improve this 
> behavior;
> here it is a list of sample problem queries
> {code:sql}
> select 123; -- comment
> select 1 as `1--`;
> select '--';
> -- This query works if a user copy-pastes it to FlinkSQL, however it fails if 
> a user types it in FlinkSQL
> select '1;
> ';
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * 1592494d4260563af4f170fb1cd45ccec9e5f0cb Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26978)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2021-11-23 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448416#comment-17448416
 ] 

Fabian Paul commented on FLINK-16419:
-

The error still seems very strange to me because every time the KafkaProducer 
opens a new transaction by calling `initTransactions()` a new producerID is 
assigned. If the producerId hit the timeout it means that no checkpoint has 
happened for the timeout duration of 24days. What your checkpoint duration and 
the configured amount of concurrent checkpoints?

Can you maybe also share the full stacktrace of the exception when exactly the 
error occurs?

 

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> 
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Reporter: Jun Qin
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor, usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755766209



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModel.java
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** A Model which classifies data using the model data computed by {@link 
NaiveBayes}. */
+public class NaiveBayesModel
+implements Model, 
NaiveBayesModelParams {
+private static final long serialVersionUID = -4673084154965905629L;
+private final Map, Object> paramMap = new HashMap<>();
+private Table modelTable;
+
+public NaiveBayesModel() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String predictionCol = getPredictionCol();
+final String featuresCol = getFeaturesCol();
+final String broadcastModelKey = "NaiveBayesModelStream";
+
+RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+RowTypeInfo outputTypeInfo =
+new RowTypeInfo(
+ArrayUtils.addAll(
+inputTypeInfo.getFieldTypes(), 
TypeInformation.of(Object.class)),
+ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
predictionCol));
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
modelTable).getTableEnvironment();
+DataStream modelStream =
+NaiveBayesModelData.toDataStream(tEnv, modelTable);
+DataStream input = tEnv.toDataStream(inputs[0]);
+
+Map> broadcastMap = new HashMap<>();
+broadcastMap.put(broadcastModelKey, modelStream);
+
+Function>, DataStream> function =
+dataStreams -> {
+DataStream stream = dataStreams.get(0);
+return stream.transform(
+this.getClass().getSimpleName(),
+outputTypeInfo,
+new PredictLabelOperator(
+new 

[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174

2021-11-23 Thread GitBox


HuangXingBo commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r755771519



##
File path: flink-ml-python/README.md
##
@@ -0,0 +1,20 @@
+Flink ML is a library which provides machine learning (ML) APIs and libraries 
that simplify the building of machine learning pipelines. It provides a set of 
standard ML APIs for MLlib developers to implement ML algorithms, as well as 
libraries of ML algorithms that can be used to build ML pipelines for both 
training and inference jobs.
+
+Flink ML is developed under the umbrella of [Apache 
Flink](https://flink.apache.org/).
+
+## Python Packaging
+
+Prerequisites for building apache-flink-ml:
+
+* Unix-like environment (we use Linux, Mac OS X)
+* Python version(3.6, 3.7 or 3.8) is required
+
+Then go to the root directory of flink-ml-python source code and run this 
command to build the sdist package of apache-flink-ml:
+```bash
+cd flink-ml-python; python setup.py sdist;
+```
+
+The sdist package of apache-flink-ml will be found under 
./flink-ml-python/dist/. It could be used for installation, such as:
+```bash
+python -m pip install dist/*.tar.gz

Review comment:
   `README.md` is for users, not for developers, so I don't think it is 
appropriate to add the content of how to test into `README.md`. Regarding the 
part of adding python CI, I intend to do it in a separate PR, thanks for the 
reminder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448413#comment-17448413
 ] 

Yun Tang commented on FLINK-15571:
--

[~monster#12] Already assigned to you.

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Assignee: ZhuoYu Chen
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24861) Support to disable caching missing key for lookup cache

2021-11-23 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-24861:
---
Release Note: You can now configure the JDBC connector to enable or disable 
caching results for keys which don't exist in the database. This is enabled by 
default. 

> Support to disable caching missing key for lookup cache 
> 
>
> Key: FLINK-24861
> URL: https://issues.apache.org/jira/browse/FLINK-24861
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Affects Versions: 1.14.0
>Reporter: Gaurav Miglani
>Assignee: Gaurav Miglani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Ideally, in case of cache miss for a key, or with null value fetch for key, 
> key shouldn't be cached



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17715: [FLINK-24820][table][docs] Examples in documentation for value1 IS DISTINCT …

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17715:
URL: https://github.com/apache/flink/pull/17715#issuecomment-962987530


   
   ## CI report:
   
   * 863b790976db36e3ddb858f118e964f9ca48927f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26128)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-15571:


Assignee: ZhuoYu Chen

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Assignee: ZhuoYu Chen
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] pnowojski commented on a change in pull request #16582: [FLINK-21504][checkpoint] Introduce notification of subsumed checkpoint

2021-11-23 Thread GitBox


pnowojski commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r755767863



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##
@@ -1405,59 +1406,80 @@ private void declineCheckpoint(
 }
 
 public void notifyCheckpointComplete(final long checkpointID) {
-final TaskInvokable invokable = this.invokable;
-
-if (executionState == ExecutionState.RUNNING) {
-checkState(invokable instanceof CheckpointableTask, "invokable is 
not checkpointable");
-try {
-((CheckpointableTask) 
invokable).notifyCheckpointCompleteAsync(checkpointID);
-} catch (RejectedExecutionException ex) {
-// This may happen if the mailbox is closed. It means that the 
task is shutting
-// down, so we just ignore it.
-LOG.debug(
-"Notify checkpoint complete {} for {} ({}) was 
rejected by the mailbox",
-checkpointID,
-taskNameWithSubtask,
-executionId);
-} catch (Throwable t) {
-if (getExecutionState() == ExecutionState.RUNNING) {
-// fail task if checkpoint confirmation failed.
-failExternally(new RuntimeException("Error while 
confirming checkpoint", t));
-}
-}
-} else {
-LOG.debug(
-"Ignoring checkpoint commit notification for non-running 
task {}.",
-taskNameWithSubtask);
-}
+notifyCheckpoint(
+checkpointID,
+CheckpointStoreUtil.INVALID_CHECKPOINT_ID,
+NotifyCheckpointOperation.COMPLETE);
 }
 
 public void notifyCheckpointAborted(
 final long checkpointID, final long latestCompletedCheckpointId) {
-final TaskInvokable invokable = this.invokable;
+notifyCheckpoint(
+checkpointID, latestCompletedCheckpointId, 
NotifyCheckpointOperation.ABORT);
+}
 
-if (executionState == ExecutionState.RUNNING) {
+public void notifyCheckpointSubsumed(long checkpointID) {
+notifyCheckpoint(
+checkpointID,
+CheckpointStoreUtil.INVALID_CHECKPOINT_ID,
+NotifyCheckpointOperation.SUBSUME);
+}
+
+private void notifyCheckpoint(
+long checkpointId,
+long latestCompletedCheckpointId,
+NotifyCheckpointOperation notifyCheckpointOperation) {
+TaskInvokable invokable = this.invokable;
+
+if (executionState == ExecutionState.RUNNING && invokable != null) {
 checkState(invokable instanceof CheckpointableTask, "invokable is 
not checkpointable");
 try {
-((CheckpointableTask) invokable)
-.notifyCheckpointAbortAsync(checkpointID, 
latestCompletedCheckpointId);
+switch (notifyCheckpointOperation) {
+case ABORT:
+((CheckpointableTask) invokable)
+.notifyCheckpointAbortAsync(
+checkpointId, 
latestCompletedCheckpointId);
+break;
+case COMPLETE:
+((CheckpointableTask) invokable)
+.notifyCheckpointCompleteAsync(checkpointId);
+break;
+case SUBSUME:
+((CheckpointableTask) invokable)
+.notifyCheckpointSubsumedAsync(checkpointId);
+}
 } catch (RejectedExecutionException ex) {
 // This may happen if the mailbox is closed. It means that the 
task is shutting
 // down, so we just ignore it.
 LOG.debug(
-"Notify checkpoint abort {} for {} ({}) was rejected 
by the mailbox",
-checkpointID,
+"Notify checkpoint {}} {} for {} ({}) was rejected by 
the mailbox.",
+notifyCheckpointOperation,
+checkpointId,
 taskNameWithSubtask,
 executionId);
 } catch (Throwable t) {
-if (getExecutionState() == ExecutionState.RUNNING) {
-// fail task if checkpoint aborted notification failed.
-failExternally(new RuntimeException("Error while aborting 
checkpoint", t));
+switch (notifyCheckpointOperation) {
+case ABORT:
+case COMPLETE:
+if (getExecutionState() == ExecutionState.RUNNING) {
+failExternally(
+new RuntimeException(
+  

[GitHub] [flink] flinkbot edited a comment on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17891:
URL: https://github.com/apache/flink/pull/17891#issuecomment-977556918


   
   ## CI report:
   
   * 9f73f2713085e557107f1da2b68e77721721eb4d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26980)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17715: [FLINK-24820][table][docs] Examples in documentation for value1 IS DISTINCT …

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17715:
URL: https://github.com/apache/flink/pull/17715#issuecomment-962987530


   
   ## CI report:
   
   * 863b790976db36e3ddb858f118e964f9ca48927f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26128)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755766209



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModel.java
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** A Model which classifies data using the model data computed by {@link 
NaiveBayes}. */
+public class NaiveBayesModel
+implements Model, 
NaiveBayesModelParams {
+private static final long serialVersionUID = -4673084154965905629L;
+private final Map, Object> paramMap = new HashMap<>();
+private Table modelTable;
+
+public NaiveBayesModel() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String predictionCol = getPredictionCol();
+final String featuresCol = getFeaturesCol();
+final String broadcastModelKey = "NaiveBayesModelStream";
+
+RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+RowTypeInfo outputTypeInfo =
+new RowTypeInfo(
+ArrayUtils.addAll(
+inputTypeInfo.getFieldTypes(), 
TypeInformation.of(Object.class)),
+ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
predictionCol));
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
modelTable).getTableEnvironment();
+DataStream modelStream =
+NaiveBayesModelData.toDataStream(tEnv, modelTable);
+DataStream input = tEnv.toDataStream(inputs[0]);
+
+Map> broadcastMap = new HashMap<>();
+broadcastMap.put(broadcastModelKey, modelStream);
+
+Function>, DataStream> function =
+dataStreams -> {
+DataStream stream = dataStreams.get(0);
+return stream.transform(
+this.getClass().getSimpleName(),
+outputTypeInfo,
+new PredictLabelOperator(
+new 

[GitHub] [flink] snuyanzin commented on pull request #17715: [FLINK-24820][table][docs] Examples in documentation for value1 IS DISTINCT …

2021-11-23 Thread GitBox


snuyanzin commented on pull request #17715:
URL: https://github.com/apache/flink/pull/17715#issuecomment-977606105


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25029) Hadoop Caller Context Setting In Flink

2021-11-23 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448409#comment-17448409
 ] 

Jingsong Lee commented on FLINK-25029:
--

[~liufangqi] Did you mean FileSystem API in Flink?

> Hadoop Caller Context Setting In Flink
> --
>
> Key: FLINK-25029
> URL: https://issues.apache.org/jira/browse/FLINK-25029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: 刘方奇
>Priority: Major
>
> For a given HDFS operation (e.g. delete file), it's very helpful to track 
> which upper level job issues it. The upper level callers may be specific 
> Oozie tasks, MR jobs, and hive queries. One scenario is that the namenode 
> (NN) is abused/spammed, the operator may want to know immediately which MR 
> job should be blamed so that she can kill it. To this end, the caller context 
> contains at least the application-dependent "tracking id".
> The above is the main effect of the Caller Context. HDFS Client set Caller 
> Context, then name node get it in audit log to do some work.
> Now the Spark and hive have the Caller Context to meet the HDFS Job Audit 
> requirement.
> In my company, flink jobs often cause some problems for HDFS, so we did it 
> for preventing some cases.
> If the feature is general enough. Should we support it, then I can submit a 
> PR for this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755759816



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the naive bayes classification algorithm.
+ *
+ * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier.
+ */
+public class NaiveBayes
+implements Estimator, 
NaiveBayesParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public NaiveBayes() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public NaiveBayesModel fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String featuresCol = getFeaturesCol();
+final String labelCol = getLabelCol();
+final double smoothing = getSmoothing();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream> input =
+tEnv.toDataStream(inputs[0])
+.map(
+new MapFunction>() 
{
+@Override
+public Tuple2 map(Row row) 
throws Exception {
+return new Tuple2<>(
+(Vector) 
row.getField(featuresCol),
+(Double) 
row.getField(labelCol));
+}
+});
+
+DataStream naiveBayesModel =
+input.flatMap(new FlattenFunction())
+.keyBy(
+(KeySelector, Object>)
+value -> new Tuple3<>(value.f0, 
value.f1, value.f2))
+.window(EndOfStreamWindows.get())
+.reduce(
+(ReduceFunction>)
+(t0, t1) -> {
+t0.f3 += t1.f3;
+return t0;
+})
+.keyBy(
+(KeySelector, Object>)
+value -> new Tuple2<>(value.f0, 
value.f1))
+.window(EndOfStreamWindows.get())
+.aggregate(new ValueMapFunction())
+.keyBy(
+  

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755759161



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the naive bayes classification algorithm.
+ *
+ * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier.
+ */
+public class NaiveBayes
+implements Estimator, 
NaiveBayesParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public NaiveBayes() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public NaiveBayesModel fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String featuresCol = getFeaturesCol();
+final String labelCol = getLabelCol();
+final double smoothing = getSmoothing();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream> input =
+tEnv.toDataStream(inputs[0])
+.map(
+new MapFunction>() 
{
+@Override
+public Tuple2 map(Row row) 
throws Exception {
+return new Tuple2<>(
+(Vector) 
row.getField(featuresCol),
+(Double) 
row.getField(labelCol));
+}
+});
+
+DataStream naiveBayesModel =
+input.flatMap(new FlattenFunction())
+.keyBy(
+(KeySelector, Object>)
+value -> new Tuple3<>(value.f0, 
value.f1, value.f2))
+.window(EndOfStreamWindows.get())
+.reduce(
+(ReduceFunction>)
+(t0, t1) -> {
+t0.f3 += t1.f3;
+return t0;
+})
+.keyBy(
+(KeySelector, Object>)
+value -> new Tuple2<>(value.f0, 
value.f1))
+.window(EndOfStreamWindows.get())
+.aggregate(new ValueMapFunction())
+.keyBy(
+  

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755751841



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModelData.java
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+/** The model data of {@link NaiveBayesModel}. Also provides classes to 
save/load model data. */
+public class NaiveBayesModelData implements Serializable {
+private static final long serialVersionUID = 3919917903722286395L;
+public final Map[][] theta;
+public final double[] piArray;
+public final double[] labels;
+
+// Empty constructor is used when Kyro deserializes loaded model data.
+public NaiveBayesModelData() {
+this(null, null, null);
+}
+
+public NaiveBayesModelData(Map[][] theta, double[] 
piArray, double[] labels) {
+this.theta = theta;
+this.piArray = piArray;
+this.labels = labels;
+}
+
+public static Table fromDataStream(

Review comment:
   Sure. I'll make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] RocMarshal commented on pull request #17789: [FLINK-24351][docs] Translate "JSON Function" pages into Chinese

2021-11-23 Thread GitBox


RocMarshal commented on pull request #17789:
URL: https://github.com/apache/flink/pull/17789#issuecomment-977590515


   @MonsterChenzhuo Thanks for the update. Could you please have a check on the 
ci result?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17842:
URL: https://github.com/apache/flink/pull/17842#issuecomment-974310249


   
   ## CI report:
   
   * 45ac29fdd23a851dfcf47e70d9db5710a19d2af8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26915)
 
   * bbc64e8c2730b42b8851683e494c5ddee017de05 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26982)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755747903



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasLabelCol.java
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+/**
+ * Param of the name of the label column in the input table.

Review comment:
   OK. I'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755747666



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/util/StageTestUtils.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.ml.api.core.Stage;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+/** Utility methods for testing stages. */
+public class StageTestUtils {
+/**
+ * Saves a stage to filesystem and reloads it with the static load() 
method a stage must
+ * implement.
+ */
+public static > T 
saveAndReload(StreamExecutionEnvironment env, T stage)
+throws Exception {
+String tempDir = Files.createTempDirectory("").toString();

Review comment:
   OK. I'll do it.
   
   The common practice in Flink tests is to initialize the `TemporaryFolder` in 
each test class, and calls for initialization in `@Before` methods. Since the 
usage of `TemporaryFolder` in our case is merely in static utility functions. I 
think we can just put it as a static member of that class and initializes in 
static context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] jackwener commented on a change in pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


jackwener commented on a change in pull request #17842:
URL: https://github.com/apache/flink/pull/17842#discussion_r755746986



##
File path: 
flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
##
@@ -54,16 +54,6 @@ public void testChoicesWithValidDefaultValue() {
 Assert.assertEquals(option.getDefaultValue(), "a");
 }
 
-@Test
-public void testChoicesWithInvalidDefautlValue() throws 
RequiredParametersException {

Review comment:
   It's redundant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] lindong28 commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174

2021-11-23 Thread GitBox


lindong28 commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r755746945



##
File path: flink-ml-python/README.md
##
@@ -0,0 +1,20 @@
+Flink ML is a library which provides machine learning (ML) APIs and libraries 
that simplify the building of machine learning pipelines. It provides a set of 
standard ML APIs for MLlib developers to implement ML algorithms, as well as 
libraries of ML algorithms that can be used to build ML pipelines for both 
training and inference jobs.
+
+Flink ML is developed under the umbrella of [Apache 
Flink](https://flink.apache.org/).
+
+## Python Packaging
+
+Prerequisites for building apache-flink-ml:
+
+* Unix-like environment (we use Linux, Mac OS X)
+* Python version(3.6, 3.7 or 3.8) is required
+
+Then go to the root directory of flink-ml-python source code and run this 
command to build the sdist package of apache-flink-ml:
+```bash
+cd flink-ml-python; python setup.py sdist;
+```
+
+The sdist package of apache-flink-ml will be found under 
./flink-ml-python/dist/. It could be used for installation, such as:
+```bash
+python -m pip install dist/*.tar.gz

Review comment:
   Could you update README to specify how to run test cases (similar to the 
existing README in `flink/flink-python/README.md`)?
   
   And we probably also need to compile the python package and run all python 
unit tests in the Github CI. This could be specified in 
`.github/workflows/java8-build.yml`. Could you help do this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] jackwener commented on a change in pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


jackwener commented on a change in pull request #17842:
URL: https://github.com/apache/flink/pull/17842#discussion_r755746522



##
File path: 
flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
##
@@ -54,16 +54,6 @@ public void testChoicesWithValidDefaultValue() {
 Assert.assertEquals(option.getDefaultValue(), "a");
 }
 
-@Test

Review comment:
   It's redundant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17842:
URL: https://github.com/apache/flink/pull/17842#issuecomment-974310249


   
   ## CI report:
   
   * 45ac29fdd23a851dfcf47e70d9db5710a19d2af8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26915)
 
   * bbc64e8c2730b42b8851683e494c5ddee017de05 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755746528



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the naive bayes classification algorithm.
+ *
+ * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier.
+ */
+public class NaiveBayes
+implements Estimator, 
NaiveBayesParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public NaiveBayes() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public NaiveBayesModel fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String featuresCol = getFeaturesCol();
+final String labelCol = getLabelCol();
+final double smoothing = getSmoothing();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream> input =
+tEnv.toDataStream(inputs[0])
+.map(
+new MapFunction>() 
{
+@Override
+public Tuple2 map(Row row) 
throws Exception {
+return new Tuple2<>(
+(Vector) 
row.getField(featuresCol),
+(Double) 
row.getField(labelCol));
+}
+});
+
+DataStream naiveBayesModel =
+input.flatMap(new FlattenFunction())
+.keyBy(
+(KeySelector, Object>)
+value -> new Tuple3<>(value.f0, 
value.f1, value.f2))
+.window(EndOfStreamWindows.get())
+.reduce(
+(ReduceFunction>)
+(t0, t1) -> {
+t0.f3 += t1.f3;
+return t0;
+})
+.keyBy(
+(KeySelector, Object>)
+value -> new Tuple2<>(value.f0, 
value.f1))
+.window(EndOfStreamWindows.get())
+.aggregate(new ValueMapFunction())
+.keyBy(
+  

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755746409



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the naive bayes classification algorithm.
+ *
+ * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier.
+ */
+public class NaiveBayes
+implements Estimator, 
NaiveBayesParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public NaiveBayes() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public NaiveBayesModel fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String featuresCol = getFeaturesCol();
+final String labelCol = getLabelCol();
+final double smoothing = getSmoothing();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream> input =
+tEnv.toDataStream(inputs[0])
+.map(
+new MapFunction>() 
{
+@Override
+public Tuple2 map(Row row) 
throws Exception {
+return new Tuple2<>(
+(Vector) 
row.getField(featuresCol),
+(Double) 
row.getField(labelCol));
+}
+});
+
+DataStream naiveBayesModel =
+input.flatMap(new FlattenFunction())

Review comment:
   OK. I'll fix it and add clearer comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25012) Cannot join hive tables with different column types

2021-11-23 Thread xiangqiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448402#comment-17448402
 ] 

xiangqiao commented on FLINK-25012:
---

Thank you [~luoyuxia] , i have added `hive.strict.checks.type.safety = false` 
in hive-site.xml ,the unit test can be run success. 

>From the abstract syntax tree, the different column types(string and bigint) 
>will be casted to double.  How the implicit type coercion done?  Will it cause 
>data correctness problems?
{code:java}
== Abstract Syntax Tree ==
LogicalSink(table=[test-catalog.db1.dest], fields=[key, val])
+- LogicalProject(key=[$0], val=[$1])
   +- LogicalProject(key=[$0], val=[$3])
      +- LogicalJoin(condition=[=(CAST($0):DOUBLE, CAST($2):DOUBLE)], 
joinType=[left])
         :- LogicalTableScan(table=[[test-catalog, db1, src2]])
         +- LogicalTableScan(table=[[test-catalog, db1, src1]]) {code}
 

> Cannot join hive tables with different column types
> ---
>
> Key: FLINK-25012
> URL: https://issues.apache.org/jira/browse/FLINK-25012
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.13.0, 1.14.0
>Reporter: xiangqiao
>Priority: Major
>
> When using the flick batch mode and join hive table, we will get the 
> following exception (this usage is no problem in spark)
> {code:java}
> java.lang.RuntimeException: 
> org.apache.hadoop.hive.ql.parse.SemanticException: Line 6:10 Wrong arguments 
> 'key': Unsafe compares between different types are disabled for safety 
> reasons. If you know what you are doing, please 
> sethive.strict.checks.type.safety to false and that hive.mapred.mode is not 
> set to 'strict' to proceed. Note that if you may get errors or incorrect 
> results if you make a mistake while using some of the unsafe features.    at 
> org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.logicalPlan(HiveParserCalcitePlanner.java:305)
>     at 
> org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:273)
>     at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:326)
>     at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:274)
>     at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
>     at 
> org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase.testJoinWithDifferentColumnType(TableEnvHiveConnectorITCase.java:136)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>     at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>     at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 6:10 Wrong 
> arguments 'key': Unsafe compares between different types 

[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly

2021-11-23 Thread loyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-
Affects Version/s: 1.13.1
   1.14.0

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] jackwener edited a comment on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


jackwener edited a comment on pull request #17842:
URL: https://github.com/apache/flink/pull/17842#issuecomment-977582827


   > The compile is failed. Please have a check @jackwener .
   
   After fix  typo (`testChoicesWithInvalidDefautlValue`),  there are same name 
function. 
   
   I found that they test same logic. it's redundant, so I remove this unit 
test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] jackwener commented on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


jackwener commented on pull request #17842:
URL: https://github.com/apache/flink/pull/17842#issuecomment-977582827


   > The compile is failed. Please have a check @jackwener .
   
   After fix  typo (`testChoicesWithInvalidDefautlValue`),  there are same name 
function. 
   
   I found that they test same logic, so I remove this unit test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17890: [BP-1.14][FLINK-24624][k8s] Clean up residual k8s resources when starting kubernetes session or application cluster failed

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17890:
URL: https://github.com/apache/flink/pull/17890#issuecomment-977413087


   
   ## CI report:
   
   * 1724c0b42250c88911e228025251ad3b764042ac Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26973)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25013) flink pulsar connector cannot consume deferred messages

2021-11-23 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448401#comment-17448401
 ] 

Yufan Sheng commented on FLINK-25013:
-

I see. But I don't this is a bug. Delayed message delivery only works in 
{{Shared}} subscription type. In {{Exclusive}} and {{Failover}} subscription 
types, the delayed message is dispatched immediately.

Can you confirm that the Source is configured to use {{Shared}} subscription.


> flink pulsar connector cannot consume deferred messages
> ---
>
> Key: FLINK-25013
> URL: https://issues.apache.org/jira/browse/FLINK-25013
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: zhangjg
>Priority: Major
>
>  
> at flink 1.14.0 version ,  pulsar connector ( PulsarSource) cannot consume 
> deferred messages . 
> For the message sent by the producer, the flink pulsar source will be 
> consumed immediately
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174

2021-11-23 Thread GitBox


HuangXingBo commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r755741660



##
File path: flink-ml-python/apache_flink_ml/version.py
##
@@ -0,0 +1,23 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+The version will be consistent with the flink ml version and follow the PEP440.
+.. seealso:: https://www.python.org/dev/peps/pep-0440
+"""
+__version__ = "0.1.dev0"

Review comment:
   Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] lindong28 commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174

2021-11-23 Thread GitBox


lindong28 commented on a change in pull request #36:
URL: https://github.com/apache/flink-ml/pull/36#discussion_r755732498



##
File path: flink-ml-python/apache_flink_ml/version.py
##
@@ -0,0 +1,23 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+The version will be consistent with the flink ml version and follow the PEP440.
+.. seealso:: https://www.python.org/dev/peps/pep-0440
+"""
+__version__ = "0.1.dev0"

Review comment:
   Could we rename the directory name from `apache_flink_ml` to `pyflink`? 
This could make the flink-ml directory structure more consistent with the core 
Flink repo. And we can let the python packages of flink-ml and the core flink 
could be installed in the same directory under `site-packages`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system

2021-11-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-11868:


Assignee: Yun Tang

> [filesystems] Introduce listStatusIterator API to file system
> -
>
> Key: FLINK-11868
> URL: https://issues.apache.org/jira/browse/FLINK-11868
> Project: Flink
>  Issue Type: New Feature
>  Components: FileSystems
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
> Fix For: 1.15.0
>
>
> From existed experience, we know {{listStatus}} is expensive for many 
> distributed file systems especially when the folder contains too many files. 
> This method would not only block the thread until result is return but also 
> could cause OOM due to the returned array of {{FileStatus}} is really large. 
> I think we should already learn it from FLINK-7266 and FLINK-8540.
> However, list file status under a path is really helpful in many situations. 
> Thankfully, many distributed file system noticed that and provide API such as 
> {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
>  to call the file system on demand.
>  
> We should also introduce this API and replace current implementation which 
> used previous {{listStatus}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25029) Hadoop Caller Context Setting In Flink

2021-11-23 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-25029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448392#comment-17448392
 ] 

刘方奇 commented on FLINK-25029:
-

[~lzljs3620320]  please help review this comment.

> Hadoop Caller Context Setting In Flink
> --
>
> Key: FLINK-25029
> URL: https://issues.apache.org/jira/browse/FLINK-25029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: 刘方奇
>Priority: Major
>
> For a given HDFS operation (e.g. delete file), it's very helpful to track 
> which upper level job issues it. The upper level callers may be specific 
> Oozie tasks, MR jobs, and hive queries. One scenario is that the namenode 
> (NN) is abused/spammed, the operator may want to know immediately which MR 
> job should be blamed so that she can kill it. To this end, the caller context 
> contains at least the application-dependent "tracking id".
> The above is the main effect of the Caller Context. HDFS Client set Caller 
> Context, then name node get it in audit log to do some work.
> Now the Spark and hive have the Caller Context to meet the HDFS Job Audit 
> requirement.
> In my company, flink jobs often cause some problems for HDFS, so we did it 
> for preventing some cases.
> If the feature is general enough. Should we support it, then I can submit a 
> PR for this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread ZhuoYu Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448391#comment-17448391
 ] 

ZhuoYu Chen commented on FLINK-15571:
-

[~yunta] Thank you for your reply, I think you can assign this task to me 
first, I've finished the mongo connector submission so far, now it's being 
reviewed by the community and I'm planning to do the reids connector next

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25029) Hadoop Caller Context Setting In Flink

2021-11-23 Thread Jira
刘方奇 created FLINK-25029:
---

 Summary: Hadoop Caller Context Setting In Flink
 Key: FLINK-25029
 URL: https://issues.apache.org/jira/browse/FLINK-25029
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: 刘方奇


For a given HDFS operation (e.g. delete file), it's very helpful to track which 
upper level job issues it. The upper level callers may be specific Oozie tasks, 
MR jobs, and hive queries. One scenario is that the namenode (NN) is 
abused/spammed, the operator may want to know immediately which MR job should 
be blamed so that she can kill it. To this end, the caller context contains at 
least the application-dependent "tracking id".

The above is the main effect of the Caller Context. HDFS Client set Caller 
Context, then name node get it in audit log to do some work.

Now the Spark and hive have the Caller Context to meet the HDFS Job Audit 
requirement.

In my company, flink jobs often cause some problems for HDFS, so we did it for 
preventing some cases.

If the feature is general enough. Should we support it, then I can submit a PR 
for this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448382#comment-17448382
 ] 

Yun Tang edited comment on FLINK-15571 at 11/24/21, 6:21 AM:
-

[~monster#12] Currently, Flink community would encourage contributors to add 
new connector to [flink packages|https://flink-packages.org/] and then could 
contribute to official [flink-connector 
repo|https://github.com/apache/flink-connectors] (it has not been setup 
completely).


was (Author: yunta):
[~monster#12] Currently, Flink community would encourage contributors to add 
new connector to [flink packages|https://flink-packages.org/] and then could 
contribute to official [flink-connector 
repo|https://github.com/apache/flink-connectors] (it has not been setup 
completely).

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448382#comment-17448382
 ] 

Yun Tang commented on FLINK-15571:
--

[~monster#12] Currently, Flink community would encourage contributors to add 
new connector to [flink packages|https://flink-packages.org/] and then could 
contribute to official [flink-connector 
repo|https://github.com/apache/flink-connectors] (it has not been setup 
completely).

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Aitozi commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-23 Thread GitBox


Aitozi commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r755729487



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {

Review comment:
   1. I think it means the failure when get cluster client failed, during 
the phase of deploy cluster. 
   2. Failing to deploy means killing.
   
   As I mentioned in [issue 
](https://issues.apache.org/jira/browse/FLINK-24624?focusedCommentId=17434081=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17434081),
 This PR only solve the problem in case1. In session/application mode, we seems 
can not  totally ensure there are no left resource, because the deploy process 
is asynchronous, it may need to handle by the client. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17598: [WIP][FLINK-24703][connectors][formats] Add FileSource support for reading CSV files.

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17598:
URL: https://github.com/apache/flink/pull/17598#issuecomment-954282485


   
   ## CI report:
   
   * 077423c8c94f59aade26c2c57001a4551a1b28af Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26971)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755727278



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the naive bayes classification algorithm.
+ *
+ * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier.
+ */
+public class NaiveBayes
+implements Estimator, 
NaiveBayesParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public NaiveBayes() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public NaiveBayesModel fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String featuresCol = getFeaturesCol();
+final String labelCol = getLabelCol();
+final double smoothing = getSmoothing();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream> input =
+tEnv.toDataStream(inputs[0])
+.map(
+new MapFunction>() 
{
+@Override
+public Tuple2 map(Row row) 
throws Exception {
+return new Tuple2<>(
+(Vector) 
row.getField(featuresCol),
+(Double) 
row.getField(labelCol));
+}
+});
+
+DataStream naiveBayesModel =

Review comment:
   Yes, `modeData` should be more clear. I'll make this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755726294



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.StageTestUtils;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests {@link NaiveBayes} and {@link NaiveBayesModel}. */
+public class NaiveBayesTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Schema schema;
+private List trainData;
+private List predictData;
+private List expectedOutput;
+private boolean isSaveLoad;

Review comment:
   Yes you are right. I will make this change in `NaiveBayesTest` like what 
we have done in `KmeansTest`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] gaoyunhaii closed pull request #34: [hotfix] Fix nullpointer exception when broadcast variables are cleaned

2021-11-23 Thread GitBox


gaoyunhaii closed pull request #34:
URL: https://github.com/apache/flink-ml/pull/34


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-11-23 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r755724605



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.StageTestUtils;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests {@link NaiveBayes} and {@link NaiveBayesModel}. */
+public class NaiveBayesTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Schema schema;
+private List trainData;
+private List predictData;
+private List expectedOutput;
+private boolean isSaveLoad;
+
+@Before
+public void before() {
+Configuration config = new Configuration();
+
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+env.setParallelism(4);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+tEnv = StreamTableEnvironment.create(env);
+
+schema =
+Schema.newBuilder()
+.column("f0", DataTypes.DOUBLE())
+.column("f1", DataTypes.of(DenseVector.class))
+.column("f2", DataTypes.DOUBLE())
+.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
+.watermark("rowtime", "SOURCE_WATERMARK()")
+.build();
+
+trainData =
+Arrays.asList(
+Row.of(1., Vectors.dense(1, 1., 1., 1., 2.), 11.0),
+Row.of(1., Vectors.dense(1, 1., 0., 1., 2.), 11.0),
+Row.of(1., Vectors.dense(2, 0., 1., 1., 3.), 11.0),
+Row.of(1., Vectors.dense(2, 0., 1., 1.5, 2.), 11.0),
+Row.of(2., Vectors.dense(3, 1.5, 1., 0.5, 3.), 10.0),
+Row.of(1., Vectors.dense(1, 1., 1.5, 0., 1.), 10.0),
+Row.of(2., Vectors.dense(4, 1., 1., 0., 1.), 10.0));
+
+predictData = trainData;
+
+expectedOutput =
+Arrays.asList(
+Row.of(1., Vectors.dense(1, 1., 1., 1., 2.), 11.0, 
11.0),
+Row.of(1., Vectors.dense(1, 1., 0., 1., 2.), 11.0, 
11.0),
+Row.of(1., Vectors.dense(2, 0., 1., 1., 3.), 11.0, 
11.0),
+Row.of(1., Vectors.dense(2, 0., 1., 1.5, 2.), 11.0, 
11.0),
+Row.of(2., Vectors.dense(3, 1.5, 1., 0.5, 3.), 10.0, 
10.0),
+Row.of(1., Vectors.dense(1, 1., 1.5, 0., 1.), 10.0, 
10.0),
+

[jira] [Commented] (FLINK-24396) Add @Public annotations to Table & SQL API classes

2021-11-23 Thread ZhuoYu Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448377#comment-17448377
 ] 

ZhuoYu Chen commented on FLINK-24396:
-

Hi [~twalthr] I am very interested in this,and I want do some job for flink,can 
I help to do that?
Thank you

> Add @Public annotations to Table & SQL API classes
> --
>
> Key: FLINK-24396
> URL: https://issues.apache.org/jira/browse/FLINK-24396
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> Many parts of the Table & SQL API have stabilized and we can mark them as 
> {{@Public}} which gives both users and downstream projects more confidence 
> when using Flink.
> A concrete list of classes and methods needs to be compiled. Some parts of 
> the API might stay {{@PublicEvolving}} for now.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] gaoyunhaii commented on pull request #34: [hotfix] Fix nullpointer exception when broadcast variables are cleaned

2021-11-23 Thread GitBox


gaoyunhaii commented on pull request #34:
URL: https://github.com/apache/flink-ml/pull/34#issuecomment-977559676


   Very thanks @zhipeng93 for the fix! LGTM and merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17813: [FLINK-24802][Table SQL/Planner] Improve cast ROW to STRING

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17813:
URL: https://github.com/apache/flink/pull/17813#issuecomment-970956527


   
   ## CI report:
   
   * d4a10bc3d83bb43c839ae7b54e8bd41a65b94925 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26877)
 
   * e04cc8d0d537afadf1b195b068939fa8e0a1f994 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26981)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24429) Port FileSystemTableSink to new Unified Sink API (FLIP-143)

2021-11-23 Thread ZhuoYu Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448376#comment-17448376
 ] 

ZhuoYu Chen commented on FLINK-24429:
-

Hi [~alexanderpreuss] I am very interested in this,and I want do some job for 
flink,can I help to do that?
Thank you

> Port FileSystemTableSink to new Unified Sink API (FLIP-143)
> ---
>
> Key: FLINK-24429
> URL: https://issues.apache.org/jira/browse/FLINK-24429
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.15.0
>Reporter: Alexander Preuss
>Assignee: Alexander Preuss
>Priority: Major
>
> We want to port the FileSystemTableSink to the new Sink API as was done with 
> the  Kafka Sink.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17891:
URL: https://github.com/apache/flink/pull/17891#issuecomment-977556918


   
   ## CI report:
   
   * 9f73f2713085e557107f1da2b68e77721721eb4d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26980)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17813: [FLINK-24802][Table SQL/Planner] Improve cast ROW to STRING

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17813:
URL: https://github.com/apache/flink/pull/17813#issuecomment-970956527


   
   ## CI report:
   
   * d4a10bc3d83bb43c839ae7b54e8bd41a65b94925 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26877)
 
   * e04cc8d0d537afadf1b195b068939fa8e0a1f994 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24959) Add a BitMap function to FlinkSQL

2021-11-23 Thread ZhuoYu Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448374#comment-17448374
 ] 

ZhuoYu Chen commented on FLINK-24959:
-

Hi [~jark] I am very interested in this,and I want do some job for flink,can I 
help to do that?
Thank you

> Add a BitMap function to FlinkSQL
> -
>
> Key: FLINK-24959
> URL: https://issues.apache.org/jira/browse/FLINK-24959
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.15.0
>Reporter: ZhuoYu Chen
>Priority: Minor
>
> bitmap_and :{color:#33}Computes the intersection of two input bitmaps and 
> returns the new bitmap{color}
> {color:#30323e}bitmap_andnot:{color:#33}Computes the set (difference set) 
> that is in A but not in B.{color}{color}
> {color:#30323e}{color:#33}Bitmap functions related to join operations, 
> etc{color}{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot commented on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…

2021-11-23 Thread GitBox


flinkbot commented on pull request #17891:
URL: https://github.com/apache/flink/pull/17891#issuecomment-977557402


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9f73f2713085e557107f1da2b68e77721721eb4d (Wed Nov 24 
05:57:18 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…

2021-11-23 Thread GitBox


flinkbot commented on pull request #17891:
URL: https://github.com/apache/flink/pull/17891#issuecomment-977556918


   
   ## CI report:
   
   * 9f73f2713085e557107f1da2b68e77721721eb4d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-12941) Translate "Amazon AWS Kinesis Streams Connector" page into Chinese

2021-11-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12941:
---
Labels: auto-unassigned pull-request-available  (was: auto-unassigned)

> Translate "Amazon AWS Kinesis Streams Connector" page into Chinese
> --
>
> Key: FLINK-12941
> URL: https://issues.apache.org/jira/browse/FLINK-12941
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: ZhuoYu Chen
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
>
> Translate the internal page 
> "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kinesis.html;
>  into Chinese.
>  
> The doc located in "flink/docs/dev/connectors/kinesis.zh.md"



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] MonsterChenzhuo opened a new pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…

2021-11-23 Thread GitBox


MonsterChenzhuo opened a new pull request #17891:
URL: https://github.com/apache/flink/pull/17891


   …WS Kinesis Streams Connector" page into Chinese
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] cc13ny commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-23 Thread GitBox


cc13ny commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r755719881



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {

Review comment:
   @Aitozi I think that's the key part of this change except other code 
re-organization. But I don't understand the PR title for the following
   
   - Is failing to get the cluster client **same as** failing to start that k8s 
cluster?
   - Which code to kill the cluster **OR** just failing to deploy means killing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] cc13ny commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-23 Thread GitBox


cc13ny commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r755719881



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##
@@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 flinkConfig.get(JobManagerOptions.PORT));
 }
 
+final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+final FlinkPod podTemplate =
+kubernetesJobManagerParameters
+.getPodTemplateFilePath()
+.map(
+file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+client, file, 
Constants.MAIN_CONTAINER_NAME))
+.orElse(new FlinkPod.Builder().build());
+final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+podTemplate, kubernetesJobManagerParameters);
+
+client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+return createClusterClientProvider(clusterId);
+}
+
+private ClusterClientProvider safelyDeployCluster(
+SupplierWithException, Exception> 
supplier)
+throws ClusterDeploymentException {
 try {
-final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-final FlinkPod podTemplate =
-kubernetesJobManagerParameters
-.getPodTemplateFilePath()
-.map(
-file ->
-
KubernetesUtils.loadPodFromTemplateFile(
-client, file, 
Constants.MAIN_CONTAINER_NAME))
-.orElse(new FlinkPod.Builder().build());
-final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-podTemplate, kubernetesJobManagerParameters);
-
-client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-return createClusterClientProvider(clusterId);
+
+ClusterClientProvider clusterClientProvider = 
supplier.get();
+
+try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {

Review comment:
   I think that's the key part of this change except other code 
re-organization. But I don't understand the PR title for the following
   
   - Is failing to get the cluster client same as failing to start that k8s 
cluster?
   - Which code to kill the cluster --> just failing to deploy means killing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-25015) job name should not always be `collect` submitted by sql client

2021-11-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-25015:
---

Assignee: KevinyhZou

> job name should not always be `collect` submitted by sql client
> ---
>
> Key: FLINK-25015
> URL: https://issues.apache.org/jira/browse/FLINK-25015
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.14.0
>Reporter: KevinyhZou
>Assignee: KevinyhZou
>Priority: Major
> Attachments: image-2021-11-23-20-15-32-459.png, 
> image-2021-11-23-20-16-21-932.png
>
>
> I use flink sql client to submitted different sql query to flink session 
> cluster,  and the sql job name is always `collect`,  as below
> !image-2021-11-23-20-16-21-932.png!
> which make no sence to users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25015) job name should not always be `collect` submitted by sql client

2021-11-23 Thread KevinyhZou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448370#comment-17448370
 ] 

KevinyhZou commented on FLINK-25015:


[~jark] I agree with you. can you assign this issue to me, and i will change 
the job name

> job name should not always be `collect` submitted by sql client
> ---
>
> Key: FLINK-25015
> URL: https://issues.apache.org/jira/browse/FLINK-25015
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.14.0
>Reporter: KevinyhZou
>Priority: Major
> Attachments: image-2021-11-23-20-15-32-459.png, 
> image-2021-11-23-20-16-21-932.png
>
>
> I use flink sql client to submitted different sql query to flink session 
> cluster,  and the sql job name is always `collect`,  as below
> !image-2021-11-23-20-16-21-932.png!
> which make no sence to users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17874: [FLINK-24046] Refactor the EmbeddedRocksDBStateBackend configuration logic

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17874:
URL: https://github.com/apache/flink/pull/17874#issuecomment-976109395


   
   ## CI report:
   
   * ce248c90c07d8df8d1d79651248cc8a7ad6e0e81 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26974)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17554:
URL: https://github.com/apache/flink/pull/17554#issuecomment-950550174


   
   ## CI report:
   
   * ae6c6bc42146b7a72215b4ceda8108fdc21b41f6 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26972)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached

2021-11-23 Thread FYung (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448365#comment-17448365
 ] 

FYung edited comment on FLINK-25027 at 11/24/21, 5:23 AM:
--

Hi [~nkruber]
Thank you for creating this issue. I'm so glad to see it because we have the 
same problem about akka thread pool. 

We submit batch job to flink session cluster and find there're too many tasks 
in akka thread pool which will cause GC. Besides 
PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout 
checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in 
DeclarativeSlotPoolBridge, timeout checker in 
`DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will 
hold these instances even the jobs are finished.

Using a dedicated thread pool per JM to manage these tasks is a good idea. 
Maybe we should create a thread pool in JM at first, then create more subtasks 
in this issue to move tasks above from akka thread pool to the thread pool in 
JM. What do you think?  Hope to hear from you [~nkruber]  [~trohrmann] THX :)


was (Author: zjureel):
Hi [~nkruber]
Thank you for creating this issue. I'm so glad to see it because we have the 
same problem about akka thread pool. 

We submit batch job to flink session cluster and find there're too many tasks 
in akka thread pool which will cause GC. Besides 
PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout 
checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in 
DeclarativeSlotPoolBridge, timeout checker in 
`DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will 
hold these instances even the jobs are finished.

Using a dedicated thread pool per JM to manage these tasks is a good idea. 
Maybe we should create a thread pool in JM at first, then create more subtasks 
in this issue to move tasks above from akka thread pool to the thread pool in 
JM. What do you think?  Hope to hear from you [~nkruber][~trohrmann] THX :)

> Allow GC of a finished job's JobMaster before the slot timeout is reached
> -
>
> Key: FLINK-25027
> URL: https://issues.apache.org/jira/browse/FLINK-25027
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.5, 1.13.3
>Reporter: Nico Kruber
>Priority: Major
> Attachments: image-2021-11-23-20-32-20-479.png
>
>
> In a session cluster, after a (batch) job is finished, the JobMaster seems to 
> stick around for another couple of minutes before being eligible for garbage 
> collection.
> Looking into a heap dump, it seems to be tied to a 
> {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying 
> Akka executor (and keeps the JM from being GC’d). Per default the action is 
> scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks 
> [~trohrmann] for helping out here)
> !image-2021-11-23-20-32-20-479.png!
> With this setting, you will have to account for enough metaspace to cover 5 
> minutes of time which may span a couple of jobs, needlessly!
> The problem seems to be that Flink is using the main thread executor for the 
> scheduling that uses the {{ActorSystem}}'s scheduler and the future task 
> scheduled with Akka can (probably) not be easily cancelled.
> One idea could be to use a dedicated thread pool per JM, that we shut down 
> when the JM terminates. That way we would not keep the JM from being GC’d.
> (The concrete example we investigated was a DataSet job)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached

2021-11-23 Thread FYung (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448365#comment-17448365
 ] 

FYung edited comment on FLINK-25027 at 11/24/21, 5:22 AM:
--

Hi [~nkruber]
Thank you for creating this issue. I'm so glad to see it because we have the 
same problem about akka thread pool. 

We submit batch job to flink session cluster and find there're too many tasks 
in akka thread pool which will cause GC. Besides 
PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout 
checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in 
DeclarativeSlotPoolBridge, timeout checker in 
`DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will 
hold these instances even the jobs are finished.

Using a dedicated thread pool per JM to manage these tasks is a good idea. 
Maybe we should create a thread pool in JM at first, then create more subtasks 
in this issue to move tasks above from akka thread pool to the thread pool in 
JM. What do you think?  Hope to hear from you [~nkruber][~trohrmann] THX :)


was (Author: zjureel):
Hi [~nkruber]
Thank you for creating this issue. I'm so glad to see it because we have the 
same problem about akka thread pool. 

We submit batch job to flink session cluster and find there're too many task in 
akka thread pool which will cause GC. Besides 
PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout 
checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in 
DeclarativeSlotPoolBridge, timeout checker in 
`DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will 
hold these instances even the jobs are finished.

Using a dedicated thread pool per JM to manage these tasks is a good idea. 
Maybe we should create a thread pool in JM at first, then create more subtasks 
in this issue to move tasks above from akka thread pool to the thread pool in 
JM. What do you think?  Hope to hear from you [~nkruber][~trohrmann] THX :)

> Allow GC of a finished job's JobMaster before the slot timeout is reached
> -
>
> Key: FLINK-25027
> URL: https://issues.apache.org/jira/browse/FLINK-25027
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.5, 1.13.3
>Reporter: Nico Kruber
>Priority: Major
> Attachments: image-2021-11-23-20-32-20-479.png
>
>
> In a session cluster, after a (batch) job is finished, the JobMaster seems to 
> stick around for another couple of minutes before being eligible for garbage 
> collection.
> Looking into a heap dump, it seems to be tied to a 
> {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying 
> Akka executor (and keeps the JM from being GC’d). Per default the action is 
> scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks 
> [~trohrmann] for helping out here)
> !image-2021-11-23-20-32-20-479.png!
> With this setting, you will have to account for enough metaspace to cover 5 
> minutes of time which may span a couple of jobs, needlessly!
> The problem seems to be that Flink is using the main thread executor for the 
> scheduling that uses the {{ActorSystem}}'s scheduler and the future task 
> scheduled with Akka can (probably) not be easily cancelled.
> One idea could be to use a dedicated thread pool per JM, that we shut down 
> when the JM terminates. That way we would not keep the JM from being GC’d.
> (The concrete example we investigated was a DataSet job)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached

2021-11-23 Thread FYung (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448365#comment-17448365
 ] 

FYung commented on FLINK-25027:
---

Hi [~nkruber]
Thank you for creating this issue. I'm so glad to see it because we have the 
same problem about akka thread pool. 

We submit batch job to flink session cluster and find there're too many task in 
akka thread pool which will cause GC. Besides 
PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout 
checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in 
DeclarativeSlotPoolBridge, timeout checker in 
`DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will 
hold these instances even the jobs are finished.

Using a dedicated thread pool per JM to manage these tasks is a good idea. 
Maybe we should create a thread pool in JM at first, then create more subtasks 
in this issue to move tasks above from akka thread pool to the thread pool in 
JM. What do you think?  Hope to hear from you [~nkruber][~trohrmann] THX :)

> Allow GC of a finished job's JobMaster before the slot timeout is reached
> -
>
> Key: FLINK-25027
> URL: https://issues.apache.org/jira/browse/FLINK-25027
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.5, 1.13.3
>Reporter: Nico Kruber
>Priority: Major
> Attachments: image-2021-11-23-20-32-20-479.png
>
>
> In a session cluster, after a (batch) job is finished, the JobMaster seems to 
> stick around for another couple of minutes before being eligible for garbage 
> collection.
> Looking into a heap dump, it seems to be tied to a 
> {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying 
> Akka executor (and keeps the JM from being GC’d). Per default the action is 
> scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks 
> [~trohrmann] for helping out here)
> !image-2021-11-23-20-32-20-479.png!
> With this setting, you will have to account for enough metaspace to cover 5 
> minutes of time which may span a couple of jobs, needlessly!
> The problem seems to be that Flink is using the main thread executor for the 
> scheduling that uses the {{ActorSystem}}'s scheduler and the future task 
> scheduled with Akka can (probably) not be easily cancelled.
> One idea could be to use a dedicated thread pool per JM, that we shut down 
> when the JM terminates. That way we would not keep the JM from being GC’d.
> (The concrete example we investigated was a DataSet job)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-23 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-24958.

Resolution: Fixed

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-23 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-24958:
---
Fix Version/s: 1.14.1

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755690633



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/EuclideanDistance.java
##
@@ -0,0 +1,272 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.flink.ml.classification.knn.KnnUtil.appendVectorToMatrix;
+
+/**
+ * Euclidean distance is the "ordinary" straight-line distance between two 
points in Euclidean
+ * space.
+ *
+ * https://en.wikipedia.org/wiki/Euclidean_distance
+ *
+ * Given two vectors a and b, Euclidean Distance = ||a - b||, where ||*|| 
means the L2 norm of
+ * the vector.
+ */
+public class EuclideanDistance implements Serializable {

Review comment:
   OK, knn's EuclideanDistance may be not a common distance, it's a fast 
distance. 
   Here, I will change the name of knn's distance name, using FastDistance 
instead of EuclideanDistance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-23 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448356#comment-17448356
 ] 

Leonard Xu commented on FLINK-24958:


Fixed via

master (1.15): ae813a7397955d7694109c3455b5cca14ac37938
release-1.14: 1592494d4260563af4f170fb1cd45ccec9e5f0cb

> correct the example and link for temporal table function documentation 
> ---
>
> Key: FLINK-24958
> URL: https://issues.apache.org/jira/browse/FLINK-24958
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * ca42ca0e3df01ed1ddbd020245528ecb7b4257eb Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26919)
 
   * 1592494d4260563af4f170fb1cd45ccec9e5f0cb Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26978)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project

2021-11-23 Thread GitBox


wuchong commented on pull request #17842:
URL: https://github.com/apache/flink/pull/17842#issuecomment-977518179


   The compile is failed. Please have a check @jackwener .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug

2021-11-23 Thread GitBox


flinkbot edited a comment on pull request #17822:
URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959


   
   ## CI report:
   
   * ca42ca0e3df01ed1ddbd020245528ecb7b4257eb Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26919)
 
   * 1592494d4260563af4f170fb1cd45ccec9e5f0cb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755688272



##
File path: flink-ml-api/src/main/java/org/apache/flink/ml/param/Param.java
##
@@ -75,7 +75,7 @@ public String jsonEncode(T value) throws IOException {
  */
 @SuppressWarnings("unchecked")
 public T jsonDecode(String json) throws IOException {
-return ReadWriteUtils.OBJECT_MAPPER.readValue(json, clazz);
+return ReadWriteUtils.OBJECT_MAPPER.fromJson(json, clazz);

Review comment:
   done

##
File path: flink-ml-api/src/main/java/org/apache/flink/ml/param/Param.java
##
@@ -64,7 +64,7 @@ public Param(
  * @return A json-formatted string.
  */
 public String jsonEncode(T value) throws IOException {
-return ReadWriteUtils.OBJECT_MAPPER.writeValueAsString(value);
+return ReadWriteUtils.OBJECT_MAPPER.toJson(value);

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755688178



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/Knn.java
##
@@ -0,0 +1,255 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.common.MapPartitionFunctionWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KNN is to classify unlabeled observations by assigning them to the class of 
the most similar
+ * labeled examples.
+ */
+public class Knn implements Estimator, KnnParams {
+
+private static final long serialVersionUID = 5292477422193301398L;
+private static final int ROW_SIZE = 2;
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+/** constructor. */
+public Knn() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public Knn(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * @param inputs a list of tables
+ * @return knn classification model.
+ */
+@Override
+public KnnModel fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+ResolvedSchema schema = inputs[0].getResolvedSchema();
+String[] colNames = schema.getColumnNames().toArray(new String[0]);
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+String[] targetCols = getFeatureCols();
+final int[] featureIndices;
+if (targetCols == null) {
+featureIndices = new int[colNames.length];
+for (int i = 0; i < colNames.length; i++) {
+featureIndices[i] = i;
+}
+} else {
+featureIndices = new int[targetCols.length];
+for (int i = 0; i < featureIndices.length; i++) {
+featureIndices[i] = findColIndex(colNames, targetCols[i]);
+}
+}
+String labelCol = getLabelCol();
+final int labelIdx = findColIndex(colNames, labelCol);
+final int vecIdx =
+getVectorCol() != null
+? findColIndex(
+inputs[0]
+.getResolvedSchema()
+.getColumnNames()
+.toArray(new String[0]),
+getVectorCol())
+: -1;
+
+DataStream trainData =
+input.map(
+(MapFunction)
+value -> {
+Object label = value.getField(labelIdx);

Review comment:
   done

##
File path: 
flink-ml-api/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java
##
@@ -43,8 +46,12 @@
 
 /** Utility methods for reading and writing stages. */
 public class ReadWriteUtils {
-public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
+public static Gson OBJECT_MAPPER =

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang merged pull request #17835: [FLINK-24958][docs]correct the example and link for temporal table function documentation

2021-11-23 Thread GitBox


leonardBang merged pull request #17835:
URL: https://github.com/apache/flink/pull/17835


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25015) job name should not always be `collect` submitted by sql client

2021-11-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448354#comment-17448354
 ] 

Jark Wu commented on FLINK-25015:
-

Yes. I think it makes sense to dispaly query as the job name. 

> job name should not always be `collect` submitted by sql client
> ---
>
> Key: FLINK-25015
> URL: https://issues.apache.org/jira/browse/FLINK-25015
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.14.0
>Reporter: KevinyhZou
>Priority: Major
> Attachments: image-2021-11-23-20-15-32-459.png, 
> image-2021-11-23-20-16-21-932.png
>
>
> I use flink sql client to submitted different sql query to flink session 
> cluster,  and the sql job name is always `collect`,  as below
> !image-2021-11-23-20-16-21-932.png!
> which make no sence to users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] leonardBang merged pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…

2021-11-23 Thread GitBox


leonardBang merged pull request #17832:
URL: https://github.com/apache/flink/pull/17832


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-21415) JDBC connector should support to disable caching missing key

2021-11-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-21415.
---
  Assignee: (was: liwei li)
Resolution: Duplicate

> JDBC connector should support to disable caching missing key
> 
>
> Key: FLINK-21415
> URL: https://issues.apache.org/jira/browse/FLINK-21415
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Affects Versions: 1.12.1
>Reporter: Shuai Xia
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> JDBC does not query data, and the cache will store an ArrayList without data.
> We should add size judgment.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24861) Support to disable caching missing key for lookup cache

2021-11-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-24861.
---
Fix Version/s: 1.15.0
   Resolution: Fixed

Fixed in master: f8fdb6713a9264691a10afe3e6e9ff6bcdc86ae1

> Support to disable caching missing key for lookup cache 
> 
>
> Key: FLINK-24861
> URL: https://issues.apache.org/jira/browse/FLINK-24861
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Affects Versions: 1.14.0
>Reporter: Gaurav Miglani
>Assignee: Gaurav Miglani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Ideally, in case of cache miss for a key, or with null value fetch for key, 
> key shouldn't be cached



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755684260



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java
##
@@ -0,0 +1,32 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasPredictionCol;
+import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** knn fit parameters. */
+public interface KnnParams
+extends WithParams,
+HasVectorColDefaultAsNull,
+HasLabelCol,
+HasFeatureColsDefaultAsNull,
+HasPredictionCol {
+/**
+ * @cn-name topK
+ * @cn topK
+ */
+Param K = new IntParam("k", "k", 10, ParamValidators.gt(0));

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755684156



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java
##
@@ -0,0 +1,32 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasPredictionCol;
+import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** knn fit parameters. */
+public interface KnnParams
+extends WithParams,
+HasVectorColDefaultAsNull,
+HasLabelCol,
+HasFeatureColsDefaultAsNull,
+HasPredictionCol {
+/**
+ * @cn-name topK
+ * @cn topK
+ */
+Param K = new IntParam("k", "k", 10, ParamValidators.gt(0));
+
+default Integer getK() {

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-24861) Support to disable caching missing key for lookup cache

2021-11-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-24861:

Summary: Support to disable caching missing key for lookup cache   (was: 
Flink MySQL Look cache update for empty hit)

> Support to disable caching missing key for lookup cache 
> 
>
> Key: FLINK-24861
> URL: https://issues.apache.org/jira/browse/FLINK-24861
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Affects Versions: 1.14.0
>Reporter: Gaurav Miglani
>Assignee: Gaurav Miglani
>Priority: Major
>  Labels: pull-request-available
>
> Ideally, in case of cache miss for a key, or with null value fetch for key, 
> key shouldn't be cached



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] wuchong merged pull request #17754: [FLINK-24861][connector][jdbc] Fix false cache lookup for empty data

2021-11-23 Thread GitBox


wuchong merged pull request #17754:
URL: https://github.com/apache/flink/pull/17754


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25028) java.lang.OutOfMemoryError: Java heap space

2021-11-23 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448347#comment-17448347
 ] 

Zhilong Hong commented on FLINK-25028:
--

It seems that an OOM happens on the TaskExecutor. There are many reasons that 
could cause an OOM. You could run the {{jmap}} command in the pod/container to 
get the snapshot of the heap memory of that TaskExecutor. Then you could use 
the {{jhat}} command to analysis the snapshot and find out what is the largest 
part that occupies the heap memory. For more information, please see [the 
official doc of 
jmap|https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr014.html].

> java.lang.OutOfMemoryError: Java heap space
> ---
>
> Key: FLINK-25028
> URL: https://issues.apache.org/jira/browse/FLINK-25028
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0
>Reporter: wangbaohua
>Priority: Blocker
> Attachments: error.txt
>
>
> java.lang.OutOfMemoryError: Java heap space
>   at java.util.HashMap.resize(HashMap.java:703) ~[?:1.8.0_131]
>   at java.util.HashMap.putVal(HashMap.java:628) ~[?:1.8.0_131]
>   at java.util.HashMap.put(HashMap.java:611) ~[?:1.8.0_131]
>   at java.util.HashSet.add(HashSet.java:219) ~[?:1.8.0_131]
>   at 
> java.io.ObjectStreamClass$FieldReflector.(ObjectStreamClass.java:1945) 
> ~[?:1.8.0_131]
>   at java.io.ObjectStreamClass.getReflector(ObjectStreamClass.java:2193) 
> ~[?:1.8.0_131]
>   at java.io.ObjectStreamClass.(ObjectStreamClass.java:521) 
> ~[?:1.8.0_131]
>   at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:369) 
> ~[?:1.8.0_131]
>   at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) 
> ~[?:1.8.0_131]
>   at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:369) 
> ~[?:1.8.0_131]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) 
> ~[?:1.8.0_131]
>   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
> ~[?:1.8.0_131]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
> ~[?:1.8.0_131]
>   at java.io.ObjectOutputStream.access$300(ObjectOutputStream.java:162) 
> ~[?:1.8.0_131]
>   at 
> java.io.ObjectOutputStream$PutFieldImpl.writeFields(ObjectOutputStream.java:1707)
>  ~[?:1.8.0_131]
>   at java.io.ObjectOutputStream.writeFields(ObjectOutputStream.java:482) 
> ~[?:1.8.0_131]
>   at 
> java.util.concurrent.ConcurrentHashMap.writeObject(ConcurrentHashMap.java:1406)
>  ~[?:1.8.0_131]
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_131]
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_131]
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_131]
>   at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) 
> ~[?:1.8.0_131]
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
> ~[?:1.8.0_131]
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_131]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_131]
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
> ~[?:1.8.0_131]
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>   at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:62) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>   at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:51)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>   at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:54)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$retrievePayload$3(TaskExecutor.java:2425)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener$$Lambda$1020/78782846.apply(Unknown
>  Source) ~[?:?]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755682429



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755682171



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";

Review comment:
   done
   

##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755678624



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";

Review comment:
   OK, I will refine it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755678460



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755678350



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755677909



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java
##
@@ -0,0 +1,594 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.core.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.linalg.DenseMatrix;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
+import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Knn classification model fitted by estimator. */
+public class KnnModel implements Model, KnnParams {
+
+private static final long serialVersionUID = 1303892137143865652L;
+
+private static final String BROADCAST_STR = "broadcastModelKey";
+private static final int FASTDISTANCE_TYPE_INDEX = 0;
+private static final int DATA_INDEX = 1;
+
+protected Map, Object> params = new HashMap<>();
+
+private Table[] modelData;
+
+/** constructor. */
+public KnnModel() {
+ParamUtils.initializeMapWithDefaultValues(params, this);
+}
+
+/**
+ * constructor.
+ *
+ * @param params parameters for algorithm.
+ */
+public KnnModel(Map, Object> params) {
+this.params = params;
+}
+
+/**
+ * Set model data for knn prediction.
+ *
+ * @param modelData knn model.
+ * @return knn classification model.
+ */
+@Override
+public KnnModel setModelData(Table... modelData) {
+this.modelData = modelData;
+return this;
+}
+
+/**
+ * get model data.
+ *
+ * @return list of tables.
+ */
+@Override
+public Table[] getModelData() {
+return modelData;
+}
+
+/**
+ * @param inputs a list of tables.
+ * @return result.
+ */
+@Override
+public Table[] transform(Table... inputs) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream input = tEnv.toDataStream(inputs[0]);
+DataStream model = tEnv.toDataStream(modelData[0]);
+
+Map> broadcastMap = new HashMap<>(1);
+broadcastMap.put(BROADCAST_STR, model);
+ResolvedSchema modelSchema = modelData[0].getResolvedSchema();
+DataType idType =
+
modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1);
+
+ResolvedSchema outputSchema =
+getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), 
idType);
+
+DataType[] dataTypes = 

[jira] [Commented] (FLINK-24763) ParquetFileSystemITCase.testLimitableBulkFormat failed on Azure

2021-11-23 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448343#comment-17448343
 ] 

Jingsong Lee commented on FLINK-24763:
--

Fixed in master: a5477938952863fb516a1a14d67808ecda047134

[~trohrmann] I merged the fix first, but there is no very certainty that the 
fix is complete, and we will keep watching.
If the problem still recurs, I will disable the test for the time being and 
create a follow-up blocker ticket to re-enable it.

> ParquetFileSystemITCase.testLimitableBulkFormat failed on Azure
> ---
>
> Key: FLINK-24763
> URL: https://issues.apache.org/jira/browse/FLINK-24763
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
> ORC, SequenceFile)
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Jingsong Lee
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0
>
>
> The test {{ParquetFileSystemITCase.testLimitableBulkFormat}} fails with 
> {code}
> 2021-11-03T22:10:11.5106075Z Nov 03 22:10:11 [ERROR] 
> testLimitableBulkFormat[false]  Time elapsed: 9.177 s  <<< ERROR!
> 2021-11-03T22:10:11.5106643Z Nov 03 22:10:11 java.lang.RuntimeException: 
> Failed to fetch next result
> 2021-11-03T22:10:11.5107213Z Nov 03 22:10:11  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2021-11-03T22:10:11.5111034Z Nov 03 22:10:11  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-11-03T22:10:11.5112190Z Nov 03 22:10:11  at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:188)
> 2021-11-03T22:10:11.5112892Z Nov 03 22:10:11  at 
> java.util.Iterator.forEachRemaining(Iterator.java:115)
> 2021-11-03T22:10:11.5113393Z Nov 03 22:10:11  at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
> 2021-11-03T22:10:11.5114157Z Nov 03 22:10:11  at 
> org.apache.flink.formats.parquet.ParquetFileSystemITCase.testLimitableBulkFormat(ParquetFileSystemITCase.java:128)
> 2021-11-03T22:10:11.5114951Z Nov 03 22:10:11  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-11-03T22:10:11.5115568Z Nov 03 22:10:11  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-11-03T22:10:11.5116115Z Nov 03 22:10:11  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-11-03T22:10:11.5116591Z Nov 03 22:10:11  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-11-03T22:10:11.5117088Z Nov 03 22:10:11  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-11-03T22:10:11.5117807Z Nov 03 22:10:11  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-11-03T22:10:11.5118821Z Nov 03 22:10:11  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-11-03T22:10:11.5119417Z Nov 03 22:10:11  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-11-03T22:10:11.5119944Z Nov 03 22:10:11  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-11-03T22:10:11.5120427Z Nov 03 22:10:11  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-11-03T22:10:11.5120919Z Nov 03 22:10:11  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2021-11-03T22:10:11.5121571Z Nov 03 22:10:11  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-11-03T22:10:11.5122526Z Nov 03 22:10:11  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2021-11-03T22:10:11.5123245Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2021-11-03T22:10:11.5123804Z Nov 03 22:10:11  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2021-11-03T22:10:11.5124314Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2021-11-03T22:10:11.5124806Z Nov 03 22:10:11  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2021-11-03T22:10:11.5125313Z Nov 03 22:10:11  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2021-11-03T22:10:11.5125810Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2021-11-03T22:10:11.5126281Z Nov 03 22:10:11  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 

[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755677169



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasFeatureColsDefaultAsNull.java
##
@@ -0,0 +1,28 @@
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Params of the names of the feature columns used for training in the input 
table. */
+public interface HasFeatureColsDefaultAsNull extends WithParams {
+/**
+ * @cn-name 特征列名数组
+ * @cn 特征列名数组,默认全选
+ */
+Param FEATURE_COLS =

Review comment:
   OK, I will refine it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755676992



##
File path: 
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java
##
@@ -0,0 +1,172 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.api.core.Pipeline;
+import org.apache.flink.ml.api.core.Stage;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** knn algorithm test. */
+public class KnnTest {
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainData;
+
+List trainArray =
+new ArrayList<>(
+Arrays.asList(
+Row.of("f", "2.0 3.0"),
+Row.of("f", "2.1 3.1"),
+Row.of("m", "200.1 300.1"),
+Row.of("m", "200.2 300.2"),
+Row.of("m", "200.3 300.3"),
+Row.of("m", "200.4 300.4"),
+Row.of("m", "200.4 300.4"),
+Row.of("m", "200.6 300.6"),
+Row.of("f", "2.1 3.1"),
+Row.of("f", "2.1 3.1"),
+Row.of("f", "2.1 3.1"),
+Row.of("f", "2.1 3.1"),
+Row.of("f", "2.3 3.2"),
+Row.of("f", "2.3 3.2"),
+Row.of("c", "2.8 3.2"),
+Row.of("d", "300. 3.2"),
+Row.of("f", "2.2 3.2"),
+Row.of("e", "2.4 3.2"),
+Row.of("e", "2.5 3.2"),
+Row.of("e", "2.5 3.2"),
+Row.of("f", "2.1 3.1")));
+
+List testArray =
+new ArrayList<>(Arrays.asList(Row.of("e", "4.0 4.1"), Row.of("m", 
"300 42")));
+
+private Table testData;
+

Review comment:
   OK, I  will do it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml

2021-11-23 Thread GitBox


weibozhao commented on a change in pull request #24:
URL: https://github.com/apache/flink-ml/pull/24#discussion_r755676909



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java
##
@@ -0,0 +1,32 @@
+package org.apache.flink.ml.classification.knn;
+
+import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasPredictionCol;
+import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** knn fit parameters. */
+public interface KnnParams
+extends WithParams,
+HasVectorColDefaultAsNull,
+HasLabelCol,
+HasFeatureColsDefaultAsNull,
+HasPredictionCol {
+/**
+ * @cn-name topK
+ * @cn topK
+ */
+Param K = new IntParam("k", "k", 10, ParamValidators.gt(0));
+
+default Integer getK() {

Review comment:
   K can be get in tow parttern: getK() or params.get(KnnParams.K)
   algorithm's implementation is no problem. I just use the second pattern, 
maybe I need change the patten later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi merged pull request #17792: [FLINK-24763][fs-connector] LimitableReader should swallow exception when reached limit

2021-11-23 Thread GitBox


JingsongLi merged pull request #17792:
URL: https://github.com/apache/flink/pull/17792


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   8   9   >