[GitHub] [flink] dannycranmer commented on a change in pull request #17785: [FLINK-24580][Connectors/Kinesis] Make ConnectTimeoutException recoverable
dannycranmer commented on a change in pull request #17785: URL: https://github.com/apache/flink/pull/17785#discussion_r755780760 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -420,6 +421,8 @@ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) protected boolean isRecoverableSdkClientException(SdkClientException ex) { if (ex instanceof AmazonServiceException) { return KinesisProxy.isRecoverableException((AmazonServiceException) ex); +} else if (ex.getCause() instanceof ConnectTimeoutException) { Review comment: Yes I agree, either approach is fine. I will wait for an update and merge once tests pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise merged pull request #17848: [FLINK-24583] [connectors/elasticsearch] Improve test stability by blocking until ES has acknowledged all records
AHeise merged pull request #17848: URL: https://github.com/apache/flink/pull/17848 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24592) FlinkSQL multiline parser improvements
[ https://issues.apache.org/jira/browse/FLINK-24592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448419#comment-17448419 ] Sergey Nuyanzin commented on FLINK-24592: - [~jark] thanks for the assignment could you or anyone from the watchers please help with review? > FlinkSQL multiline parser improvements > -- > > Key: FLINK-24592 > URL: https://issues.apache.org/jira/browse/FLINK-24592 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently existing multiline parser has limitations e.g. > line could not end with semicolon e.g. as a part of field value, comment or > column name. > Also if a query contains '--' e.g. as a part of varchar field value then it > fails. > In case there is no objections I would put some efforts to improve this > behavior; > here it is a list of sample problem queries > {code:sql} > select 123; -- comment > select 1 as `1--`; > select '--'; > -- This query works if a user copy-pastes it to FlinkSQL, however it fails if > a user types it in FlinkSQL > select '1; > '; > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug
flinkbot edited a comment on pull request #17822: URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959 ## CI report: * 1592494d4260563af4f170fb1cd45ccec9e5f0cb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26978) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
[ https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448416#comment-17448416 ] Fabian Paul commented on FLINK-16419: - The error still seems very strange to me because every time the KafkaProducer opens a new transaction by calling `initTransactions()` a new producerID is assigned. If the producerId hit the timeout it means that no checkpoint has happened for the timeout duration of 24days. What your checkpoint duration and the configured amount of concurrent checkpoints? Can you maybe also share the full stacktrace of the exception when exactly the error occurs? > Avoid to recommit transactions which are known committed successfully to > Kafka upon recovery > > > Key: FLINK-16419 > URL: https://issues.apache.org/jira/browse/FLINK-16419 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Runtime / Checkpointing >Reporter: Jun Qin >Priority: Minor > Labels: auto-deprioritized-major, stale-minor, usability > > When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer > tries to recommit all pre-committed transactions which are in the snapshot, > even if those transactions were successfully committed before (i.e., the call > to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} > returns OK). This may lead to recovery failures when recovering from a very > old snapshot because the transactional IDs in that snapshot may have been > expired and removed from Kafka. For example the following scenario: > # Start a Flink job with FlinkKafkaProducer sink with exactly-once > # Suspend the Flink job with a savepoint A > # Wait for time longer than {{transactional.id.expiration.ms}} + > {{transaction.remove.expired.transaction.cleanup.interval.ms}} > # Recover the job with savepoint A. > # The recovery will fail with the following error: > {noformat} > 2020-02-26 14:33:25,817 INFO > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer > - Attempting to resume transaction Source: Custom Source -> Sink: > Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch > 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata > - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA > 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer > - [Producer clientId=producer-1, transactionalId=Source: Custom > Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka > producer with timeoutMillis = 92233720 > 36854775807 ms. > 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Sink: Unnamed (1/1) > (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: > The producer attempted to use a producer id which is not currently assigned > to its transactional id. > at > org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) > at java.lang.Thread.run(Thread.java:748) > {noformat} > For now, the workaround is to call > {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as > it may hide real transaction timeout errors. > After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible > way is to let JobManager, after successfully notifies all operators the > completion of a snapshot (via {{notifyCheckpoingComplete}}), record the > success, e.g., write the successful transactional IDs somewhere in the > snapshot. Then those transactions need not recommit upon recovery. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755766209 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModel.java ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** A Model which classifies data using the model data computed by {@link NaiveBayes}. */ +public class NaiveBayesModel +implements Model, NaiveBayesModelParams { +private static final long serialVersionUID = -4673084154965905629L; +private final Map, Object> paramMap = new HashMap<>(); +private Table modelTable; + +public NaiveBayesModel() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public Table[] transform(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String predictionCol = getPredictionCol(); +final String featuresCol = getFeaturesCol(); +final String broadcastModelKey = "NaiveBayesModelStream"; + +RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema()); +RowTypeInfo outputTypeInfo = +new RowTypeInfo( +ArrayUtils.addAll( +inputTypeInfo.getFieldTypes(), TypeInformation.of(Object.class)), +ArrayUtils.addAll(inputTypeInfo.getFieldNames(), predictionCol)); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) modelTable).getTableEnvironment(); +DataStream modelStream = +NaiveBayesModelData.toDataStream(tEnv, modelTable); +DataStream input = tEnv.toDataStream(inputs[0]); + +Map> broadcastMap = new HashMap<>(); +broadcastMap.put(broadcastModelKey, modelStream); + +Function>, DataStream> function = +dataStreams -> { +DataStream stream = dataStreams.get(0); +return stream.transform( +this.getClass().getSimpleName(), +outputTypeInfo, +new PredictLabelOperator( +new
[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174
HuangXingBo commented on a change in pull request #36: URL: https://github.com/apache/flink-ml/pull/36#discussion_r755771519 ## File path: flink-ml-python/README.md ## @@ -0,0 +1,20 @@ +Flink ML is a library which provides machine learning (ML) APIs and libraries that simplify the building of machine learning pipelines. It provides a set of standard ML APIs for MLlib developers to implement ML algorithms, as well as libraries of ML algorithms that can be used to build ML pipelines for both training and inference jobs. + +Flink ML is developed under the umbrella of [Apache Flink](https://flink.apache.org/). + +## Python Packaging + +Prerequisites for building apache-flink-ml: + +* Unix-like environment (we use Linux, Mac OS X) +* Python version(3.6, 3.7 or 3.8) is required + +Then go to the root directory of flink-ml-python source code and run this command to build the sdist package of apache-flink-ml: +```bash +cd flink-ml-python; python setup.py sdist; +``` + +The sdist package of apache-flink-ml will be found under ./flink-ml-python/dist/. It could be used for installation, such as: +```bash +python -m pip install dist/*.tar.gz Review comment: `README.md` is for users, not for developers, so I don't think it is appropriate to add the content of how to test into `README.md`. Regarding the part of adding python CI, I intend to do it in a separate PR, thanks for the reminder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink
[ https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448413#comment-17448413 ] Yun Tang commented on FLINK-15571: -- [~monster#12] Already assigned to you. > Create a Redis Streams Connector for Flink > -- > > Key: FLINK-15571 > URL: https://issues.apache.org/jira/browse/FLINK-15571 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Tugdual Grall >Assignee: ZhuoYu Chen >Priority: Minor > Labels: pull-request-available > > Redis has a "log data structure" called Redis Streams, it would be nice to > integrate Redis Streams and Apache Flink as: > * Source > * Sink > See Redis Streams introduction: [https://redis.io/topics/streams-intro] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24861) Support to disable caching missing key for lookup cache
[ https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-24861: --- Release Note: You can now configure the JDBC connector to enable or disable caching results for keys which don't exist in the database. This is enabled by default. > Support to disable caching missing key for lookup cache > > > Key: FLINK-24861 > URL: https://issues.apache.org/jira/browse/FLINK-24861 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Affects Versions: 1.14.0 >Reporter: Gaurav Miglani >Assignee: Gaurav Miglani >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Ideally, in case of cache miss for a key, or with null value fetch for key, > key shouldn't be cached -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17715: [FLINK-24820][table][docs] Examples in documentation for value1 IS DISTINCT …
flinkbot edited a comment on pull request #17715: URL: https://github.com/apache/flink/pull/17715#issuecomment-962987530 ## CI report: * 863b790976db36e3ddb858f118e964f9ca48927f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26128) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-15571) Create a Redis Streams Connector for Flink
[ https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-15571: Assignee: ZhuoYu Chen > Create a Redis Streams Connector for Flink > -- > > Key: FLINK-15571 > URL: https://issues.apache.org/jira/browse/FLINK-15571 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Tugdual Grall >Assignee: ZhuoYu Chen >Priority: Minor > Labels: pull-request-available > > Redis has a "log data structure" called Redis Streams, it would be nice to > integrate Redis Streams and Apache Flink as: > * Source > * Sink > See Redis Streams introduction: [https://redis.io/topics/streams-intro] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] pnowojski commented on a change in pull request #16582: [FLINK-21504][checkpoint] Introduce notification of subsumed checkpoint
pnowojski commented on a change in pull request #16582: URL: https://github.com/apache/flink/pull/16582#discussion_r755767863 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1405,59 +1406,80 @@ private void declineCheckpoint( } public void notifyCheckpointComplete(final long checkpointID) { -final TaskInvokable invokable = this.invokable; - -if (executionState == ExecutionState.RUNNING) { -checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable"); -try { -((CheckpointableTask) invokable).notifyCheckpointCompleteAsync(checkpointID); -} catch (RejectedExecutionException ex) { -// This may happen if the mailbox is closed. It means that the task is shutting -// down, so we just ignore it. -LOG.debug( -"Notify checkpoint complete {} for {} ({}) was rejected by the mailbox", -checkpointID, -taskNameWithSubtask, -executionId); -} catch (Throwable t) { -if (getExecutionState() == ExecutionState.RUNNING) { -// fail task if checkpoint confirmation failed. -failExternally(new RuntimeException("Error while confirming checkpoint", t)); -} -} -} else { -LOG.debug( -"Ignoring checkpoint commit notification for non-running task {}.", -taskNameWithSubtask); -} +notifyCheckpoint( +checkpointID, +CheckpointStoreUtil.INVALID_CHECKPOINT_ID, +NotifyCheckpointOperation.COMPLETE); } public void notifyCheckpointAborted( final long checkpointID, final long latestCompletedCheckpointId) { -final TaskInvokable invokable = this.invokable; +notifyCheckpoint( +checkpointID, latestCompletedCheckpointId, NotifyCheckpointOperation.ABORT); +} -if (executionState == ExecutionState.RUNNING) { +public void notifyCheckpointSubsumed(long checkpointID) { +notifyCheckpoint( +checkpointID, +CheckpointStoreUtil.INVALID_CHECKPOINT_ID, +NotifyCheckpointOperation.SUBSUME); +} + +private void notifyCheckpoint( +long checkpointId, +long latestCompletedCheckpointId, +NotifyCheckpointOperation notifyCheckpointOperation) { +TaskInvokable invokable = this.invokable; + +if (executionState == ExecutionState.RUNNING && invokable != null) { checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable"); try { -((CheckpointableTask) invokable) -.notifyCheckpointAbortAsync(checkpointID, latestCompletedCheckpointId); +switch (notifyCheckpointOperation) { +case ABORT: +((CheckpointableTask) invokable) +.notifyCheckpointAbortAsync( +checkpointId, latestCompletedCheckpointId); +break; +case COMPLETE: +((CheckpointableTask) invokable) +.notifyCheckpointCompleteAsync(checkpointId); +break; +case SUBSUME: +((CheckpointableTask) invokable) +.notifyCheckpointSubsumedAsync(checkpointId); +} } catch (RejectedExecutionException ex) { // This may happen if the mailbox is closed. It means that the task is shutting // down, so we just ignore it. LOG.debug( -"Notify checkpoint abort {} for {} ({}) was rejected by the mailbox", -checkpointID, +"Notify checkpoint {}} {} for {} ({}) was rejected by the mailbox.", +notifyCheckpointOperation, +checkpointId, taskNameWithSubtask, executionId); } catch (Throwable t) { -if (getExecutionState() == ExecutionState.RUNNING) { -// fail task if checkpoint aborted notification failed. -failExternally(new RuntimeException("Error while aborting checkpoint", t)); +switch (notifyCheckpointOperation) { +case ABORT: +case COMPLETE: +if (getExecutionState() == ExecutionState.RUNNING) { +failExternally( +new RuntimeException( +
[GitHub] [flink] flinkbot edited a comment on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…
flinkbot edited a comment on pull request #17891: URL: https://github.com/apache/flink/pull/17891#issuecomment-977556918 ## CI report: * 9f73f2713085e557107f1da2b68e77721721eb4d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26980) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17715: [FLINK-24820][table][docs] Examples in documentation for value1 IS DISTINCT …
flinkbot edited a comment on pull request #17715: URL: https://github.com/apache/flink/pull/17715#issuecomment-962987530 ## CI report: * 863b790976db36e3ddb858f118e964f9ca48927f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26128) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755766209 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModel.java ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** A Model which classifies data using the model data computed by {@link NaiveBayes}. */ +public class NaiveBayesModel +implements Model, NaiveBayesModelParams { +private static final long serialVersionUID = -4673084154965905629L; +private final Map, Object> paramMap = new HashMap<>(); +private Table modelTable; + +public NaiveBayesModel() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public Table[] transform(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String predictionCol = getPredictionCol(); +final String featuresCol = getFeaturesCol(); +final String broadcastModelKey = "NaiveBayesModelStream"; + +RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema()); +RowTypeInfo outputTypeInfo = +new RowTypeInfo( +ArrayUtils.addAll( +inputTypeInfo.getFieldTypes(), TypeInformation.of(Object.class)), +ArrayUtils.addAll(inputTypeInfo.getFieldNames(), predictionCol)); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) modelTable).getTableEnvironment(); +DataStream modelStream = +NaiveBayesModelData.toDataStream(tEnv, modelTable); +DataStream input = tEnv.toDataStream(inputs[0]); + +Map> broadcastMap = new HashMap<>(); +broadcastMap.put(broadcastModelKey, modelStream); + +Function>, DataStream> function = +dataStreams -> { +DataStream stream = dataStreams.get(0); +return stream.transform( +this.getClass().getSimpleName(), +outputTypeInfo, +new PredictLabelOperator( +new
[GitHub] [flink] snuyanzin commented on pull request #17715: [FLINK-24820][table][docs] Examples in documentation for value1 IS DISTINCT …
snuyanzin commented on pull request #17715: URL: https://github.com/apache/flink/pull/17715#issuecomment-977606105 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25029) Hadoop Caller Context Setting In Flink
[ https://issues.apache.org/jira/browse/FLINK-25029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448409#comment-17448409 ] Jingsong Lee commented on FLINK-25029: -- [~liufangqi] Did you mean FileSystem API in Flink? > Hadoop Caller Context Setting In Flink > -- > > Key: FLINK-25029 > URL: https://issues.apache.org/jira/browse/FLINK-25029 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: 刘方奇 >Priority: Major > > For a given HDFS operation (e.g. delete file), it's very helpful to track > which upper level job issues it. The upper level callers may be specific > Oozie tasks, MR jobs, and hive queries. One scenario is that the namenode > (NN) is abused/spammed, the operator may want to know immediately which MR > job should be blamed so that she can kill it. To this end, the caller context > contains at least the application-dependent "tracking id". > The above is the main effect of the Caller Context. HDFS Client set Caller > Context, then name node get it in audit log to do some work. > Now the Spark and hive have the Caller Context to meet the HDFS Job Audit > requirement. > In my company, flink jobs often cause some problems for HDFS, so we did it > for preventing some cases. > If the feature is general enough. Should we support it, then I can submit a > PR for this. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755759816 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * An Estimator which implements the naive bayes classification algorithm. + * + * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier. + */ +public class NaiveBayes +implements Estimator, NaiveBayesParams { +private final Map, Object> paramMap = new HashMap<>(); + +public NaiveBayes() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public NaiveBayesModel fit(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String featuresCol = getFeaturesCol(); +final String labelCol = getLabelCol(); +final double smoothing = getSmoothing(); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream> input = +tEnv.toDataStream(inputs[0]) +.map( +new MapFunction>() { +@Override +public Tuple2 map(Row row) throws Exception { +return new Tuple2<>( +(Vector) row.getField(featuresCol), +(Double) row.getField(labelCol)); +} +}); + +DataStream naiveBayesModel = +input.flatMap(new FlattenFunction()) +.keyBy( +(KeySelector, Object>) +value -> new Tuple3<>(value.f0, value.f1, value.f2)) +.window(EndOfStreamWindows.get()) +.reduce( +(ReduceFunction>) +(t0, t1) -> { +t0.f3 += t1.f3; +return t0; +}) +.keyBy( +(KeySelector, Object>) +value -> new Tuple2<>(value.f0, value.f1)) +.window(EndOfStreamWindows.get()) +.aggregate(new ValueMapFunction()) +.keyBy( +
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755759161 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * An Estimator which implements the naive bayes classification algorithm. + * + * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier. + */ +public class NaiveBayes +implements Estimator, NaiveBayesParams { +private final Map, Object> paramMap = new HashMap<>(); + +public NaiveBayes() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public NaiveBayesModel fit(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String featuresCol = getFeaturesCol(); +final String labelCol = getLabelCol(); +final double smoothing = getSmoothing(); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream> input = +tEnv.toDataStream(inputs[0]) +.map( +new MapFunction>() { +@Override +public Tuple2 map(Row row) throws Exception { +return new Tuple2<>( +(Vector) row.getField(featuresCol), +(Double) row.getField(labelCol)); +} +}); + +DataStream naiveBayesModel = +input.flatMap(new FlattenFunction()) +.keyBy( +(KeySelector, Object>) +value -> new Tuple3<>(value.f0, value.f1, value.f2)) +.window(EndOfStreamWindows.get()) +.reduce( +(ReduceFunction>) +(t0, t1) -> { +t0.f3 += t1.f3; +return t0; +}) +.keyBy( +(KeySelector, Object>) +value -> new Tuple2<>(value.f0, value.f1)) +.window(EndOfStreamWindows.get()) +.aggregate(new ValueMapFunction()) +.keyBy( +
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755751841 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModelData.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Map; + +/** The model data of {@link NaiveBayesModel}. Also provides classes to save/load model data. */ +public class NaiveBayesModelData implements Serializable { +private static final long serialVersionUID = 3919917903722286395L; +public final Map[][] theta; +public final double[] piArray; +public final double[] labels; + +// Empty constructor is used when Kyro deserializes loaded model data. +public NaiveBayesModelData() { +this(null, null, null); +} + +public NaiveBayesModelData(Map[][] theta, double[] piArray, double[] labels) { +this.theta = theta; +this.piArray = piArray; +this.labels = labels; +} + +public static Table fromDataStream( Review comment: Sure. I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on pull request #17789: [FLINK-24351][docs] Translate "JSON Function" pages into Chinese
RocMarshal commented on pull request #17789: URL: https://github.com/apache/flink/pull/17789#issuecomment-977590515 @MonsterChenzhuo Thanks for the update. Could you please have a check on the ci result? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
flinkbot edited a comment on pull request #17842: URL: https://github.com/apache/flink/pull/17842#issuecomment-974310249 ## CI report: * 45ac29fdd23a851dfcf47e70d9db5710a19d2af8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26915) * bbc64e8c2730b42b8851683e494c5ddee017de05 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26982) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755747903 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasLabelCol.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.StringParam; +import org.apache.flink.ml.param.WithParams; + +/** + * Param of the name of the label column in the input table. Review comment: OK. I'll fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755747666 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/util/StageTestUtils.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.ml.api.core.Stage; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.lang.reflect.Method; +import java.nio.file.Files; + +/** Utility methods for testing stages. */ +public class StageTestUtils { +/** + * Saves a stage to filesystem and reloads it with the static load() method a stage must + * implement. + */ +public static > T saveAndReload(StreamExecutionEnvironment env, T stage) +throws Exception { +String tempDir = Files.createTempDirectory("").toString(); Review comment: OK. I'll do it. The common practice in Flink tests is to initialize the `TemporaryFolder` in each test class, and calls for initialization in `@Before` methods. Since the usage of `TemporaryFolder` in our case is merely in static utility functions. I think we can just put it as a static member of that class and initializes in static context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jackwener commented on a change in pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
jackwener commented on a change in pull request #17842: URL: https://github.com/apache/flink/pull/17842#discussion_r755746986 ## File path: flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java ## @@ -54,16 +54,6 @@ public void testChoicesWithValidDefaultValue() { Assert.assertEquals(option.getDefaultValue(), "a"); } -@Test -public void testChoicesWithInvalidDefautlValue() throws RequiredParametersException { Review comment: It's redundant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174
lindong28 commented on a change in pull request #36: URL: https://github.com/apache/flink-ml/pull/36#discussion_r755746945 ## File path: flink-ml-python/README.md ## @@ -0,0 +1,20 @@ +Flink ML is a library which provides machine learning (ML) APIs and libraries that simplify the building of machine learning pipelines. It provides a set of standard ML APIs for MLlib developers to implement ML algorithms, as well as libraries of ML algorithms that can be used to build ML pipelines for both training and inference jobs. + +Flink ML is developed under the umbrella of [Apache Flink](https://flink.apache.org/). + +## Python Packaging + +Prerequisites for building apache-flink-ml: + +* Unix-like environment (we use Linux, Mac OS X) +* Python version(3.6, 3.7 or 3.8) is required + +Then go to the root directory of flink-ml-python source code and run this command to build the sdist package of apache-flink-ml: +```bash +cd flink-ml-python; python setup.py sdist; +``` + +The sdist package of apache-flink-ml will be found under ./flink-ml-python/dist/. It could be used for installation, such as: +```bash +python -m pip install dist/*.tar.gz Review comment: Could you update README to specify how to run test cases (similar to the existing README in `flink/flink-python/README.md`)? And we probably also need to compile the python package and run all python unit tests in the Github CI. This could be specified in `.github/workflows/java8-build.yml`. Could you help do this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jackwener commented on a change in pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
jackwener commented on a change in pull request #17842: URL: https://github.com/apache/flink/pull/17842#discussion_r755746522 ## File path: flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java ## @@ -54,16 +54,6 @@ public void testChoicesWithValidDefaultValue() { Assert.assertEquals(option.getDefaultValue(), "a"); } -@Test Review comment: It's redundant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
flinkbot edited a comment on pull request #17842: URL: https://github.com/apache/flink/pull/17842#issuecomment-974310249 ## CI report: * 45ac29fdd23a851dfcf47e70d9db5710a19d2af8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26915) * bbc64e8c2730b42b8851683e494c5ddee017de05 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755746528 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * An Estimator which implements the naive bayes classification algorithm. + * + * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier. + */ +public class NaiveBayes +implements Estimator, NaiveBayesParams { +private final Map, Object> paramMap = new HashMap<>(); + +public NaiveBayes() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public NaiveBayesModel fit(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String featuresCol = getFeaturesCol(); +final String labelCol = getLabelCol(); +final double smoothing = getSmoothing(); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream> input = +tEnv.toDataStream(inputs[0]) +.map( +new MapFunction>() { +@Override +public Tuple2 map(Row row) throws Exception { +return new Tuple2<>( +(Vector) row.getField(featuresCol), +(Double) row.getField(labelCol)); +} +}); + +DataStream naiveBayesModel = +input.flatMap(new FlattenFunction()) +.keyBy( +(KeySelector, Object>) +value -> new Tuple3<>(value.f0, value.f1, value.f2)) +.window(EndOfStreamWindows.get()) +.reduce( +(ReduceFunction>) +(t0, t1) -> { +t0.f3 += t1.f3; +return t0; +}) +.keyBy( +(KeySelector, Object>) +value -> new Tuple2<>(value.f0, value.f1)) +.window(EndOfStreamWindows.get()) +.aggregate(new ValueMapFunction()) +.keyBy( +
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755746409 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * An Estimator which implements the naive bayes classification algorithm. + * + * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier. + */ +public class NaiveBayes +implements Estimator, NaiveBayesParams { +private final Map, Object> paramMap = new HashMap<>(); + +public NaiveBayes() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public NaiveBayesModel fit(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String featuresCol = getFeaturesCol(); +final String labelCol = getLabelCol(); +final double smoothing = getSmoothing(); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream> input = +tEnv.toDataStream(inputs[0]) +.map( +new MapFunction>() { +@Override +public Tuple2 map(Row row) throws Exception { +return new Tuple2<>( +(Vector) row.getField(featuresCol), +(Double) row.getField(labelCol)); +} +}); + +DataStream naiveBayesModel = +input.flatMap(new FlattenFunction()) Review comment: OK. I'll fix it and add clearer comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25012) Cannot join hive tables with different column types
[ https://issues.apache.org/jira/browse/FLINK-25012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448402#comment-17448402 ] xiangqiao commented on FLINK-25012: --- Thank you [~luoyuxia] , i have added `hive.strict.checks.type.safety = false` in hive-site.xml ,the unit test can be run success. >From the abstract syntax tree, the different column types(string and bigint) >will be casted to double. How the implicit type coercion done? Will it cause >data correctness problems? {code:java} == Abstract Syntax Tree == LogicalSink(table=[test-catalog.db1.dest], fields=[key, val]) +- LogicalProject(key=[$0], val=[$1]) +- LogicalProject(key=[$0], val=[$3]) +- LogicalJoin(condition=[=(CAST($0):DOUBLE, CAST($2):DOUBLE)], joinType=[left]) :- LogicalTableScan(table=[[test-catalog, db1, src2]]) +- LogicalTableScan(table=[[test-catalog, db1, src1]]) {code} > Cannot join hive tables with different column types > --- > > Key: FLINK-25012 > URL: https://issues.apache.org/jira/browse/FLINK-25012 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.13.0, 1.14.0 >Reporter: xiangqiao >Priority: Major > > When using the flick batch mode and join hive table, we will get the > following exception (this usage is no problem in spark) > {code:java} > java.lang.RuntimeException: > org.apache.hadoop.hive.ql.parse.SemanticException: Line 6:10 Wrong arguments > 'key': Unsafe compares between different types are disabled for safety > reasons. If you know what you are doing, please > sethive.strict.checks.type.safety to false and that hive.mapred.mode is not > set to 'strict' to proceed. Note that if you may get errors or incorrect > results if you make a mistake while using some of the unsafe features. at > org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.logicalPlan(HiveParserCalcitePlanner.java:305) > at > org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:273) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:326) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:274) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723) > at > org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase.testJoinWithDifferentColumnType(TableEnvHiveConnectorITCase.java:136) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) > Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 6:10 Wrong > arguments 'key': Unsafe compares between different types
[jira] [Updated] (FLINK-23190) Make task-slot allocation much more evenly
[ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loyi updated FLINK-23190: - Affects Version/s: 1.13.1 1.14.0 > Make task-slot allocation much more evenly > -- > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.3, 1.13.1 >Reporter: loyi >Assignee: loyi >Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] jackwener edited a comment on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
jackwener edited a comment on pull request #17842: URL: https://github.com/apache/flink/pull/17842#issuecomment-977582827 > The compile is failed. Please have a check @jackwener . After fix typo (`testChoicesWithInvalidDefautlValue`), there are same name function. I found that they test same logic. it's redundant, so I remove this unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jackwener commented on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
jackwener commented on pull request #17842: URL: https://github.com/apache/flink/pull/17842#issuecomment-977582827 > The compile is failed. Please have a check @jackwener . After fix typo (`testChoicesWithInvalidDefautlValue`), there are same name function. I found that they test same logic, so I remove this unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17890: [BP-1.14][FLINK-24624][k8s] Clean up residual k8s resources when starting kubernetes session or application cluster failed
flinkbot edited a comment on pull request #17890: URL: https://github.com/apache/flink/pull/17890#issuecomment-977413087 ## CI report: * 1724c0b42250c88911e228025251ad3b764042ac Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26973) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25013) flink pulsar connector cannot consume deferred messages
[ https://issues.apache.org/jira/browse/FLINK-25013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448401#comment-17448401 ] Yufan Sheng commented on FLINK-25013: - I see. But I don't this is a bug. Delayed message delivery only works in {{Shared}} subscription type. In {{Exclusive}} and {{Failover}} subscription types, the delayed message is dispatched immediately. Can you confirm that the Source is configured to use {{Shared}} subscription. > flink pulsar connector cannot consume deferred messages > --- > > Key: FLINK-25013 > URL: https://issues.apache.org/jira/browse/FLINK-25013 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.0 >Reporter: zhangjg >Priority: Major > > > at flink 1.14.0 version , pulsar connector ( PulsarSource) cannot consume > deferred messages . > For the message sent by the producer, the flink pulsar source will be > consumed immediately > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] HuangXingBo commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174
HuangXingBo commented on a change in pull request #36: URL: https://github.com/apache/flink-ml/pull/36#discussion_r755741660 ## File path: flink-ml-python/apache_flink_ml/version.py ## @@ -0,0 +1,23 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +The version will be consistent with the flink ml version and follow the PEP440. +.. seealso:: https://www.python.org/dev/peps/pep-0440 +""" +__version__ = "0.1.dev0" Review comment: Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a change in pull request #36: [FLINK-24933][python] Support ML Python API to implement FLIP-173 and FLP-174
lindong28 commented on a change in pull request #36: URL: https://github.com/apache/flink-ml/pull/36#discussion_r755732498 ## File path: flink-ml-python/apache_flink_ml/version.py ## @@ -0,0 +1,23 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +The version will be consistent with the flink ml version and follow the PEP440. +.. seealso:: https://www.python.org/dev/peps/pep-0440 +""" +__version__ = "0.1.dev0" Review comment: Could we rename the directory name from `apache_flink_ml` to `pyflink`? This could make the flink-ml directory structure more consistent with the core Flink repo. And we can let the python packages of flink-ml and the core flink could be installed in the same directory under `site-packages`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system
[ https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-11868: Assignee: Yun Tang > [filesystems] Introduce listStatusIterator API to file system > - > > Key: FLINK-11868 > URL: https://issues.apache.org/jira/browse/FLINK-11868 > Project: Flink > Issue Type: New Feature > Components: FileSystems >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > Fix For: 1.15.0 > > > From existed experience, we know {{listStatus}} is expensive for many > distributed file systems especially when the folder contains too many files. > This method would not only block the thread until result is return but also > could cause OOM due to the returned array of {{FileStatus}} is really large. > I think we should already learn it from FLINK-7266 and FLINK-8540. > However, list file status under a path is really helpful in many situations. > Thankfully, many distributed file system noticed that and provide API such as > {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}} > to call the file system on demand. > > We should also introduce this API and replace current implementation which > used previous {{listStatus}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25029) Hadoop Caller Context Setting In Flink
[ https://issues.apache.org/jira/browse/FLINK-25029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448392#comment-17448392 ] 刘方奇 commented on FLINK-25029: - [~lzljs3620320] please help review this comment. > Hadoop Caller Context Setting In Flink > -- > > Key: FLINK-25029 > URL: https://issues.apache.org/jira/browse/FLINK-25029 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: 刘方奇 >Priority: Major > > For a given HDFS operation (e.g. delete file), it's very helpful to track > which upper level job issues it. The upper level callers may be specific > Oozie tasks, MR jobs, and hive queries. One scenario is that the namenode > (NN) is abused/spammed, the operator may want to know immediately which MR > job should be blamed so that she can kill it. To this end, the caller context > contains at least the application-dependent "tracking id". > The above is the main effect of the Caller Context. HDFS Client set Caller > Context, then name node get it in audit log to do some work. > Now the Spark and hive have the Caller Context to meet the HDFS Job Audit > requirement. > In my company, flink jobs often cause some problems for HDFS, so we did it > for preventing some cases. > If the feature is general enough. Should we support it, then I can submit a > PR for this. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink
[ https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448391#comment-17448391 ] ZhuoYu Chen commented on FLINK-15571: - [~yunta] Thank you for your reply, I think you can assign this task to me first, I've finished the mongo connector submission so far, now it's being reviewed by the community and I'm planning to do the reids connector next > Create a Redis Streams Connector for Flink > -- > > Key: FLINK-15571 > URL: https://issues.apache.org/jira/browse/FLINK-15571 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Tugdual Grall >Priority: Minor > Labels: pull-request-available > > Redis has a "log data structure" called Redis Streams, it would be nice to > integrate Redis Streams and Apache Flink as: > * Source > * Sink > See Redis Streams introduction: [https://redis.io/topics/streams-intro] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25029) Hadoop Caller Context Setting In Flink
刘方奇 created FLINK-25029: --- Summary: Hadoop Caller Context Setting In Flink Key: FLINK-25029 URL: https://issues.apache.org/jira/browse/FLINK-25029 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: 刘方奇 For a given HDFS operation (e.g. delete file), it's very helpful to track which upper level job issues it. The upper level callers may be specific Oozie tasks, MR jobs, and hive queries. One scenario is that the namenode (NN) is abused/spammed, the operator may want to know immediately which MR job should be blamed so that she can kill it. To this end, the caller context contains at least the application-dependent "tracking id". The above is the main effect of the Caller Context. HDFS Client set Caller Context, then name node get it in audit log to do some work. Now the Spark and hive have the Caller Context to meet the HDFS Job Audit requirement. In my company, flink jobs often cause some problems for HDFS, so we did it for preventing some cases. If the feature is general enough. Should we support it, then I can submit a PR for this. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-15571) Create a Redis Streams Connector for Flink
[ https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448382#comment-17448382 ] Yun Tang edited comment on FLINK-15571 at 11/24/21, 6:21 AM: - [~monster#12] Currently, Flink community would encourage contributors to add new connector to [flink packages|https://flink-packages.org/] and then could contribute to official [flink-connector repo|https://github.com/apache/flink-connectors] (it has not been setup completely). was (Author: yunta): [~monster#12] Currently, Flink community would encourage contributors to add new connector to [flink packages|https://flink-packages.org/] and then could contribute to official [flink-connector repo|https://github.com/apache/flink-connectors] (it has not been setup completely). > Create a Redis Streams Connector for Flink > -- > > Key: FLINK-15571 > URL: https://issues.apache.org/jira/browse/FLINK-15571 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Tugdual Grall >Priority: Minor > Labels: pull-request-available > > Redis has a "log data structure" called Redis Streams, it would be nice to > integrate Redis Streams and Apache Flink as: > * Source > * Sink > See Redis Streams introduction: [https://redis.io/topics/streams-intro] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink
[ https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448382#comment-17448382 ] Yun Tang commented on FLINK-15571: -- [~monster#12] Currently, Flink community would encourage contributors to add new connector to [flink packages|https://flink-packages.org/] and then could contribute to official [flink-connector repo|https://github.com/apache/flink-connectors] (it has not been setup completely). > Create a Redis Streams Connector for Flink > -- > > Key: FLINK-15571 > URL: https://issues.apache.org/jira/browse/FLINK-15571 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Tugdual Grall >Priority: Minor > Labels: pull-request-available > > Redis has a "log data structure" called Redis Streams, it would be nice to > integrate Redis Streams and Apache Flink as: > * Source > * Sink > See Redis Streams introduction: [https://redis.io/topics/streams-intro] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] Aitozi commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
Aitozi commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r755729487 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private ClusterClientProvider safelyDeployCluster( +SupplierWithException, Exception> supplier) +throws ClusterDeploymentException { try { -final KubernetesJobManagerParameters kubernetesJobManagerParameters = -new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - -final FlinkPod podTemplate = -kubernetesJobManagerParameters -.getPodTemplateFilePath() -.map( -file -> - KubernetesUtils.loadPodFromTemplateFile( -client, file, Constants.MAIN_CONTAINER_NAME)) -.orElse(new FlinkPod.Builder().build()); -final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( -podTemplate, kubernetesJobManagerParameters); - -client.createJobManagerComponent(kubernetesJobManagerSpec); - -return createClusterClientProvider(clusterId); + +ClusterClientProvider clusterClientProvider = supplier.get(); + +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { Review comment: 1. I think it means the failure when get cluster client failed, during the phase of deploy cluster. 2. Failing to deploy means killing. As I mentioned in [issue ](https://issues.apache.org/jira/browse/FLINK-24624?focusedCommentId=17434081=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17434081), This PR only solve the problem in case1. In session/application mode, we seems can not totally ensure there are no left resource, because the deploy process is asynchronous, it may need to handle by the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17598: [WIP][FLINK-24703][connectors][formats] Add FileSource support for reading CSV files.
flinkbot edited a comment on pull request #17598: URL: https://github.com/apache/flink/pull/17598#issuecomment-954282485 ## CI report: * 077423c8c94f59aade26c2c57001a4551a1b28af Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26971) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755727278 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayes.java ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * An Estimator which implements the naive bayes classification algorithm. + * + * See https://en.wikipedia.org/wiki/Naive_Bayes_classifier. + */ +public class NaiveBayes +implements Estimator, NaiveBayesParams { +private final Map, Object> paramMap = new HashMap<>(); + +public NaiveBayes() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public NaiveBayesModel fit(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String featuresCol = getFeaturesCol(); +final String labelCol = getLabelCol(); +final double smoothing = getSmoothing(); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream> input = +tEnv.toDataStream(inputs[0]) +.map( +new MapFunction>() { +@Override +public Tuple2 map(Row row) throws Exception { +return new Tuple2<>( +(Vector) row.getField(featuresCol), +(Double) row.getField(labelCol)); +} +}); + +DataStream naiveBayesModel = Review comment: Yes, `modeData` should be more clear. I'll make this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755726294 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.classification.naivebayes.NaiveBayes; +import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel; +import org.apache.flink.ml.classification.naivebayes.NaiveBayesModelData; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.ml.util.StageTestUtils; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.IteratorUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests {@link NaiveBayes} and {@link NaiveBayesModel}. */ +public class NaiveBayesTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Schema schema; +private List trainData; +private List predictData; +private List expectedOutput; +private boolean isSaveLoad; Review comment: Yes you are right. I will make this change in `NaiveBayesTest` like what we have done in `KmeansTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] gaoyunhaii closed pull request #34: [hotfix] Fix nullpointer exception when broadcast variables are cleaned
gaoyunhaii closed pull request #34: URL: https://github.com/apache/flink-ml/pull/34 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r755724605 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.classification.naivebayes.NaiveBayes; +import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel; +import org.apache.flink.ml.classification.naivebayes.NaiveBayesModelData; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.ml.util.StageTestUtils; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.IteratorUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests {@link NaiveBayes} and {@link NaiveBayesModel}. */ +public class NaiveBayesTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Schema schema; +private List trainData; +private List predictData; +private List expectedOutput; +private boolean isSaveLoad; + +@Before +public void before() { +Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); +env = StreamExecutionEnvironment.getExecutionEnvironment(config); +env.setParallelism(4); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); +tEnv = StreamTableEnvironment.create(env); + +schema = +Schema.newBuilder() +.column("f0", DataTypes.DOUBLE()) +.column("f1", DataTypes.of(DenseVector.class)) +.column("f2", DataTypes.DOUBLE()) +.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") +.watermark("rowtime", "SOURCE_WATERMARK()") +.build(); + +trainData = +Arrays.asList( +Row.of(1., Vectors.dense(1, 1., 1., 1., 2.), 11.0), +Row.of(1., Vectors.dense(1, 1., 0., 1., 2.), 11.0), +Row.of(1., Vectors.dense(2, 0., 1., 1., 3.), 11.0), +Row.of(1., Vectors.dense(2, 0., 1., 1.5, 2.), 11.0), +Row.of(2., Vectors.dense(3, 1.5, 1., 0.5, 3.), 10.0), +Row.of(1., Vectors.dense(1, 1., 1.5, 0., 1.), 10.0), +Row.of(2., Vectors.dense(4, 1., 1., 0., 1.), 10.0)); + +predictData = trainData; + +expectedOutput = +Arrays.asList( +Row.of(1., Vectors.dense(1, 1., 1., 1., 2.), 11.0, 11.0), +Row.of(1., Vectors.dense(1, 1., 0., 1., 2.), 11.0, 11.0), +Row.of(1., Vectors.dense(2, 0., 1., 1., 3.), 11.0, 11.0), +Row.of(1., Vectors.dense(2, 0., 1., 1.5, 2.), 11.0, 11.0), +Row.of(2., Vectors.dense(3, 1.5, 1., 0.5, 3.), 10.0, 10.0), +Row.of(1., Vectors.dense(1, 1., 1.5, 0., 1.), 10.0, 10.0), +
[jira] [Commented] (FLINK-24396) Add @Public annotations to Table & SQL API classes
[ https://issues.apache.org/jira/browse/FLINK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448377#comment-17448377 ] ZhuoYu Chen commented on FLINK-24396: - Hi [~twalthr] I am very interested in this,and I want do some job for flink,can I help to do that? Thank you > Add @Public annotations to Table & SQL API classes > -- > > Key: FLINK-24396 > URL: https://issues.apache.org/jira/browse/FLINK-24396 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Timo Walther >Priority: Major > > Many parts of the Table & SQL API have stabilized and we can mark them as > {{@Public}} which gives both users and downstream projects more confidence > when using Flink. > A concrete list of classes and methods needs to be compiled. Some parts of > the API might stay {{@PublicEvolving}} for now. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] gaoyunhaii commented on pull request #34: [hotfix] Fix nullpointer exception when broadcast variables are cleaned
gaoyunhaii commented on pull request #34: URL: https://github.com/apache/flink-ml/pull/34#issuecomment-977559676 Very thanks @zhipeng93 for the fix! LGTM and merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17813: [FLINK-24802][Table SQL/Planner] Improve cast ROW to STRING
flinkbot edited a comment on pull request #17813: URL: https://github.com/apache/flink/pull/17813#issuecomment-970956527 ## CI report: * d4a10bc3d83bb43c839ae7b54e8bd41a65b94925 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26877) * e04cc8d0d537afadf1b195b068939fa8e0a1f994 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26981) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24429) Port FileSystemTableSink to new Unified Sink API (FLIP-143)
[ https://issues.apache.org/jira/browse/FLINK-24429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448376#comment-17448376 ] ZhuoYu Chen commented on FLINK-24429: - Hi [~alexanderpreuss] I am very interested in this,and I want do some job for flink,can I help to do that? Thank you > Port FileSystemTableSink to new Unified Sink API (FLIP-143) > --- > > Key: FLINK-24429 > URL: https://issues.apache.org/jira/browse/FLINK-24429 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.15.0 >Reporter: Alexander Preuss >Assignee: Alexander Preuss >Priority: Major > > We want to port the FileSystemTableSink to the new Sink API as was done with > the Kafka Sink. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…
flinkbot edited a comment on pull request #17891: URL: https://github.com/apache/flink/pull/17891#issuecomment-977556918 ## CI report: * 9f73f2713085e557107f1da2b68e77721721eb4d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26980) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17813: [FLINK-24802][Table SQL/Planner] Improve cast ROW to STRING
flinkbot edited a comment on pull request #17813: URL: https://github.com/apache/flink/pull/17813#issuecomment-970956527 ## CI report: * d4a10bc3d83bb43c839ae7b54e8bd41a65b94925 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26877) * e04cc8d0d537afadf1b195b068939fa8e0a1f994 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24959) Add a BitMap function to FlinkSQL
[ https://issues.apache.org/jira/browse/FLINK-24959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448374#comment-17448374 ] ZhuoYu Chen commented on FLINK-24959: - Hi [~jark] I am very interested in this,and I want do some job for flink,can I help to do that? Thank you > Add a BitMap function to FlinkSQL > - > > Key: FLINK-24959 > URL: https://issues.apache.org/jira/browse/FLINK-24959 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.15.0 >Reporter: ZhuoYu Chen >Priority: Minor > > bitmap_and :{color:#33}Computes the intersection of two input bitmaps and > returns the new bitmap{color} > {color:#30323e}bitmap_andnot:{color:#33}Computes the set (difference set) > that is in A but not in B.{color}{color} > {color:#30323e}{color:#33}Bitmap functions related to join operations, > etc{color}{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot commented on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…
flinkbot commented on pull request #17891: URL: https://github.com/apache/flink/pull/17891#issuecomment-977557402 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9f73f2713085e557107f1da2b68e77721721eb4d (Wed Nov 24 05:57:18 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…
flinkbot commented on pull request #17891: URL: https://github.com/apache/flink/pull/17891#issuecomment-977556918 ## CI report: * 9f73f2713085e557107f1da2b68e77721721eb4d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-12941) Translate "Amazon AWS Kinesis Streams Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12941: --- Labels: auto-unassigned pull-request-available (was: auto-unassigned) > Translate "Amazon AWS Kinesis Streams Connector" page into Chinese > -- > > Key: FLINK-12941 > URL: https://issues.apache.org/jira/browse/FLINK-12941 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: ZhuoYu Chen >Priority: Minor > Labels: auto-unassigned, pull-request-available > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kinesis.html; > into Chinese. > > The doc located in "flink/docs/dev/connectors/kinesis.zh.md" -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] MonsterChenzhuo opened a new pull request #17891: [FLINK-12941][chinese-translation, Documentation ]Translate "Amazon A…
MonsterChenzhuo opened a new pull request #17891: URL: https://github.com/apache/flink/pull/17891 …WS Kinesis Streams Connector" page into Chinese ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] cc13ny commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
cc13ny commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r755719881 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private ClusterClientProvider safelyDeployCluster( +SupplierWithException, Exception> supplier) +throws ClusterDeploymentException { try { -final KubernetesJobManagerParameters kubernetesJobManagerParameters = -new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - -final FlinkPod podTemplate = -kubernetesJobManagerParameters -.getPodTemplateFilePath() -.map( -file -> - KubernetesUtils.loadPodFromTemplateFile( -client, file, Constants.MAIN_CONTAINER_NAME)) -.orElse(new FlinkPod.Builder().build()); -final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( -podTemplate, kubernetesJobManagerParameters); - -client.createJobManagerComponent(kubernetesJobManagerSpec); - -return createClusterClientProvider(clusterId); + +ClusterClientProvider clusterClientProvider = supplier.get(); + +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { Review comment: @Aitozi I think that's the key part of this change except other code re-organization. But I don't understand the PR title for the following - Is failing to get the cluster client **same as** failing to start that k8s cluster? - Which code to kill the cluster **OR** just failing to deploy means killing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] cc13ny commented on a change in pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
cc13ny commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r755719881 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ## @@ -256,36 +244,51 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } +final KubernetesJobManagerParameters kubernetesJobManagerParameters = +new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + +final FlinkPod podTemplate = +kubernetesJobManagerParameters +.getPodTemplateFilePath() +.map( +file -> + KubernetesUtils.loadPodFromTemplateFile( +client, file, Constants.MAIN_CONTAINER_NAME)) +.orElse(new FlinkPod.Builder().build()); +final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( +podTemplate, kubernetesJobManagerParameters); + +client.createJobManagerComponent(kubernetesJobManagerSpec); + +return createClusterClientProvider(clusterId); +} + +private ClusterClientProvider safelyDeployCluster( +SupplierWithException, Exception> supplier) +throws ClusterDeploymentException { try { -final KubernetesJobManagerParameters kubernetesJobManagerParameters = -new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - -final FlinkPod podTemplate = -kubernetesJobManagerParameters -.getPodTemplateFilePath() -.map( -file -> - KubernetesUtils.loadPodFromTemplateFile( -client, file, Constants.MAIN_CONTAINER_NAME)) -.orElse(new FlinkPod.Builder().build()); -final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( -podTemplate, kubernetesJobManagerParameters); - -client.createJobManagerComponent(kubernetesJobManagerSpec); - -return createClusterClientProvider(clusterId); + +ClusterClientProvider clusterClientProvider = supplier.get(); + +try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { Review comment: I think that's the key part of this change except other code re-organization. But I don't understand the PR title for the following - Is failing to get the cluster client same as failing to start that k8s cluster? - Which code to kill the cluster --> just failing to deploy means killing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-25015) job name should not always be `collect` submitted by sql client
[ https://issues.apache.org/jira/browse/FLINK-25015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-25015: --- Assignee: KevinyhZou > job name should not always be `collect` submitted by sql client > --- > > Key: FLINK-25015 > URL: https://issues.apache.org/jira/browse/FLINK-25015 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.14.0 >Reporter: KevinyhZou >Assignee: KevinyhZou >Priority: Major > Attachments: image-2021-11-23-20-15-32-459.png, > image-2021-11-23-20-16-21-932.png > > > I use flink sql client to submitted different sql query to flink session > cluster, and the sql job name is always `collect`, as below > !image-2021-11-23-20-16-21-932.png! > which make no sence to users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25015) job name should not always be `collect` submitted by sql client
[ https://issues.apache.org/jira/browse/FLINK-25015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448370#comment-17448370 ] KevinyhZou commented on FLINK-25015: [~jark] I agree with you. can you assign this issue to me, and i will change the job name > job name should not always be `collect` submitted by sql client > --- > > Key: FLINK-25015 > URL: https://issues.apache.org/jira/browse/FLINK-25015 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.14.0 >Reporter: KevinyhZou >Priority: Major > Attachments: image-2021-11-23-20-15-32-459.png, > image-2021-11-23-20-16-21-932.png > > > I use flink sql client to submitted different sql query to flink session > cluster, and the sql job name is always `collect`, as below > !image-2021-11-23-20-16-21-932.png! > which make no sence to users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17874: [FLINK-24046] Refactor the EmbeddedRocksDBStateBackend configuration logic
flinkbot edited a comment on pull request #17874: URL: https://github.com/apache/flink/pull/17874#issuecomment-976109395 ## CI report: * ce248c90c07d8df8d1d79651248cc8a7ad6e0e81 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26974) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17554: [FLINK-24624][Kubernetes]Kill cluster when starting kubernetes session or application cluster failed
flinkbot edited a comment on pull request #17554: URL: https://github.com/apache/flink/pull/17554#issuecomment-950550174 ## CI report: * ae6c6bc42146b7a72215b4ceda8108fdc21b41f6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26972) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached
[ https://issues.apache.org/jira/browse/FLINK-25027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448365#comment-17448365 ] FYung edited comment on FLINK-25027 at 11/24/21, 5:23 AM: -- Hi [~nkruber] Thank you for creating this issue. I'm so glad to see it because we have the same problem about akka thread pool. We submit batch job to flink session cluster and find there're too many tasks in akka thread pool which will cause GC. Besides PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in DeclarativeSlotPoolBridge, timeout checker in `DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will hold these instances even the jobs are finished. Using a dedicated thread pool per JM to manage these tasks is a good idea. Maybe we should create a thread pool in JM at first, then create more subtasks in this issue to move tasks above from akka thread pool to the thread pool in JM. What do you think? Hope to hear from you [~nkruber] [~trohrmann] THX :) was (Author: zjureel): Hi [~nkruber] Thank you for creating this issue. I'm so glad to see it because we have the same problem about akka thread pool. We submit batch job to flink session cluster and find there're too many tasks in akka thread pool which will cause GC. Besides PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in DeclarativeSlotPoolBridge, timeout checker in `DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will hold these instances even the jobs are finished. Using a dedicated thread pool per JM to manage these tasks is a good idea. Maybe we should create a thread pool in JM at first, then create more subtasks in this issue to move tasks above from akka thread pool to the thread pool in JM. What do you think? Hope to hear from you [~nkruber][~trohrmann] THX :) > Allow GC of a finished job's JobMaster before the slot timeout is reached > - > > Key: FLINK-25027 > URL: https://issues.apache.org/jira/browse/FLINK-25027 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.5, 1.13.3 >Reporter: Nico Kruber >Priority: Major > Attachments: image-2021-11-23-20-32-20-479.png > > > In a session cluster, after a (batch) job is finished, the JobMaster seems to > stick around for another couple of minutes before being eligible for garbage > collection. > Looking into a heap dump, it seems to be tied to a > {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying > Akka executor (and keeps the JM from being GC’d). Per default the action is > scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks > [~trohrmann] for helping out here) > !image-2021-11-23-20-32-20-479.png! > With this setting, you will have to account for enough metaspace to cover 5 > minutes of time which may span a couple of jobs, needlessly! > The problem seems to be that Flink is using the main thread executor for the > scheduling that uses the {{ActorSystem}}'s scheduler and the future task > scheduled with Akka can (probably) not be easily cancelled. > One idea could be to use a dedicated thread pool per JM, that we shut down > when the JM terminates. That way we would not keep the JM from being GC’d. > (The concrete example we investigated was a DataSet job) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached
[ https://issues.apache.org/jira/browse/FLINK-25027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448365#comment-17448365 ] FYung edited comment on FLINK-25027 at 11/24/21, 5:22 AM: -- Hi [~nkruber] Thank you for creating this issue. I'm so glad to see it because we have the same problem about akka thread pool. We submit batch job to flink session cluster and find there're too many tasks in akka thread pool which will cause GC. Besides PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in DeclarativeSlotPoolBridge, timeout checker in `DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will hold these instances even the jobs are finished. Using a dedicated thread pool per JM to manage these tasks is a good idea. Maybe we should create a thread pool in JM at first, then create more subtasks in this issue to move tasks above from akka thread pool to the thread pool in JM. What do you think? Hope to hear from you [~nkruber][~trohrmann] THX :) was (Author: zjureel): Hi [~nkruber] Thank you for creating this issue. I'm so glad to see it because we have the same problem about akka thread pool. We submit batch job to flink session cluster and find there're too many task in akka thread pool which will cause GC. Besides PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in DeclarativeSlotPoolBridge, timeout checker in `DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will hold these instances even the jobs are finished. Using a dedicated thread pool per JM to manage these tasks is a good idea. Maybe we should create a thread pool in JM at first, then create more subtasks in this issue to move tasks above from akka thread pool to the thread pool in JM. What do you think? Hope to hear from you [~nkruber][~trohrmann] THX :) > Allow GC of a finished job's JobMaster before the slot timeout is reached > - > > Key: FLINK-25027 > URL: https://issues.apache.org/jira/browse/FLINK-25027 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.5, 1.13.3 >Reporter: Nico Kruber >Priority: Major > Attachments: image-2021-11-23-20-32-20-479.png > > > In a session cluster, after a (batch) job is finished, the JobMaster seems to > stick around for another couple of minutes before being eligible for garbage > collection. > Looking into a heap dump, it seems to be tied to a > {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying > Akka executor (and keeps the JM from being GC’d). Per default the action is > scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks > [~trohrmann] for helping out here) > !image-2021-11-23-20-32-20-479.png! > With this setting, you will have to account for enough metaspace to cover 5 > minutes of time which may span a couple of jobs, needlessly! > The problem seems to be that Flink is using the main thread executor for the > scheduling that uses the {{ActorSystem}}'s scheduler and the future task > scheduled with Akka can (probably) not be easily cancelled. > One idea could be to use a dedicated thread pool per JM, that we shut down > when the JM terminates. That way we would not keep the JM from being GC’d. > (The concrete example we investigated was a DataSet job) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached
[ https://issues.apache.org/jira/browse/FLINK-25027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448365#comment-17448365 ] FYung commented on FLINK-25027: --- Hi [~nkruber] Thank you for creating this issue. I'm so glad to see it because we have the same problem about akka thread pool. We submit batch job to flink session cluster and find there're too many task in akka thread pool which will cause GC. Besides PhysicalSlotRequestBulkCheckerImpl, we also find tasks of `heartbeat`, timeout checker of `pending request`/`checkIdleSlotTimeout`/`checkBatchSlotTimeout` in DeclarativeSlotPoolBridge, timeout checker in `DefaultScheduler.registerProducedPartitions` and more. Akka thread pool will hold these instances even the jobs are finished. Using a dedicated thread pool per JM to manage these tasks is a good idea. Maybe we should create a thread pool in JM at first, then create more subtasks in this issue to move tasks above from akka thread pool to the thread pool in JM. What do you think? Hope to hear from you [~nkruber][~trohrmann] THX :) > Allow GC of a finished job's JobMaster before the slot timeout is reached > - > > Key: FLINK-25027 > URL: https://issues.apache.org/jira/browse/FLINK-25027 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.12.5, 1.13.3 >Reporter: Nico Kruber >Priority: Major > Attachments: image-2021-11-23-20-32-20-479.png > > > In a session cluster, after a (batch) job is finished, the JobMaster seems to > stick around for another couple of minutes before being eligible for garbage > collection. > Looking into a heap dump, it seems to be tied to a > {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying > Akka executor (and keeps the JM from being GC’d). Per default the action is > scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks > [~trohrmann] for helping out here) > !image-2021-11-23-20-32-20-479.png! > With this setting, you will have to account for enough metaspace to cover 5 > minutes of time which may span a couple of jobs, needlessly! > The problem seems to be that Flink is using the main thread executor for the > scheduling that uses the {{ActorSystem}}'s scheduler and the future task > scheduled with Akka can (probably) not be easily cancelled. > One idea could be to use a dedicated thread pool per JM, that we shut down > when the JM terminates. That way we would not keep the JM from being GC’d. > (The concrete example we investigated was a DataSet job) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-24958) correct the example and link for temporal table function documentation
[ https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-24958. Resolution: Fixed > correct the example and link for temporal table function documentation > --- > > Key: FLINK-24958 > URL: https://issues.apache.org/jira/browse/FLINK-24958 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.14.0 >Reporter: zoucao >Assignee: zoucao >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1 > > > correct the example and link for temporal table function documentation -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24958) correct the example and link for temporal table function documentation
[ https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-24958: --- Fix Version/s: 1.14.1 > correct the example and link for temporal table function documentation > --- > > Key: FLINK-24958 > URL: https://issues.apache.org/jira/browse/FLINK-24958 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.14.0 >Reporter: zoucao >Assignee: zoucao >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1 > > > correct the example and link for temporal table function documentation -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755690633 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/EuclideanDistance.java ## @@ -0,0 +1,272 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.ml.classification.knn.KnnUtil.appendVectorToMatrix; + +/** + * Euclidean distance is the "ordinary" straight-line distance between two points in Euclidean + * space. + * + * https://en.wikipedia.org/wiki/Euclidean_distance + * + * Given two vectors a and b, Euclidean Distance = ||a - b||, where ||*|| means the L2 norm of + * the vector. + */ +public class EuclideanDistance implements Serializable { Review comment: OK, knn's EuclideanDistance may be not a common distance, it's a fast distance. Here, I will change the name of knn's distance name, using FastDistance instead of EuclideanDistance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-24958) correct the example and link for temporal table function documentation
[ https://issues.apache.org/jira/browse/FLINK-24958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448356#comment-17448356 ] Leonard Xu commented on FLINK-24958: Fixed via master (1.15): ae813a7397955d7694109c3455b5cca14ac37938 release-1.14: 1592494d4260563af4f170fb1cd45ccec9e5f0cb > correct the example and link for temporal table function documentation > --- > > Key: FLINK-24958 > URL: https://issues.apache.org/jira/browse/FLINK-24958 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.14.0 >Reporter: zoucao >Assignee: zoucao >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > correct the example and link for temporal table function documentation -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug
flinkbot edited a comment on pull request #17822: URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959 ## CI report: * ca42ca0e3df01ed1ddbd020245528ecb7b4257eb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26919) * 1592494d4260563af4f170fb1cd45ccec9e5f0cb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26978) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #17842: [FLINK-24966] [docs] Fix spelling errors in the project
wuchong commented on pull request #17842: URL: https://github.com/apache/flink/pull/17842#issuecomment-977518179 The compile is failed. Please have a check @jackwener . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17822: Release 1.14 kafka3.0 bug
flinkbot edited a comment on pull request #17822: URL: https://github.com/apache/flink/pull/17822#issuecomment-971696959 ## CI report: * ca42ca0e3df01ed1ddbd020245528ecb7b4257eb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26919) * 1592494d4260563af4f170fb1cd45ccec9e5f0cb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755688272 ## File path: flink-ml-api/src/main/java/org/apache/flink/ml/param/Param.java ## @@ -75,7 +75,7 @@ public String jsonEncode(T value) throws IOException { */ @SuppressWarnings("unchecked") public T jsonDecode(String json) throws IOException { -return ReadWriteUtils.OBJECT_MAPPER.readValue(json, clazz); +return ReadWriteUtils.OBJECT_MAPPER.fromJson(json, clazz); Review comment: done ## File path: flink-ml-api/src/main/java/org/apache/flink/ml/param/Param.java ## @@ -64,7 +64,7 @@ public Param( * @return A json-formatted string. */ public String jsonEncode(T value) throws IOException { -return ReadWriteUtils.OBJECT_MAPPER.writeValueAsString(value); +return ReadWriteUtils.OBJECT_MAPPER.toJson(value); Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755688178 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/Knn.java ## @@ -0,0 +1,255 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.common.MapPartitionFunctionWrapper; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * KNN is to classify unlabeled observations by assigning them to the class of the most similar + * labeled examples. + */ +public class Knn implements Estimator, KnnParams { + +private static final long serialVersionUID = 5292477422193301398L; +private static final int ROW_SIZE = 2; +private static final int FASTDISTANCE_TYPE_INDEX = 0; +private static final int DATA_INDEX = 1; + +protected Map, Object> params = new HashMap<>(); + +/** constructor. */ +public Knn() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * constructor. + * + * @param params parameters for algorithm. + */ +public Knn(Map, Object> params) { +this.params = params; +} + +/** + * @param inputs a list of tables + * @return knn classification model. + */ +@Override +public KnnModel fit(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); +ResolvedSchema schema = inputs[0].getResolvedSchema(); +String[] colNames = schema.getColumnNames().toArray(new String[0]); +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +String[] targetCols = getFeatureCols(); +final int[] featureIndices; +if (targetCols == null) { +featureIndices = new int[colNames.length]; +for (int i = 0; i < colNames.length; i++) { +featureIndices[i] = i; +} +} else { +featureIndices = new int[targetCols.length]; +for (int i = 0; i < featureIndices.length; i++) { +featureIndices[i] = findColIndex(colNames, targetCols[i]); +} +} +String labelCol = getLabelCol(); +final int labelIdx = findColIndex(colNames, labelCol); +final int vecIdx = +getVectorCol() != null +? findColIndex( +inputs[0] +.getResolvedSchema() +.getColumnNames() +.toArray(new String[0]), +getVectorCol()) +: -1; + +DataStream trainData = +input.map( +(MapFunction) +value -> { +Object label = value.getField(labelIdx); Review comment: done ## File path: flink-ml-api/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java ## @@ -43,8 +46,12 @@ /** Utility methods for reading and writing stages. */ public class ReadWriteUtils { -public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - +public static Gson OBJECT_MAPPER = Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang merged pull request #17835: [FLINK-24958][docs]correct the example and link for temporal table function documentation
leonardBang merged pull request #17835: URL: https://github.com/apache/flink/pull/17835 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25015) job name should not always be `collect` submitted by sql client
[ https://issues.apache.org/jira/browse/FLINK-25015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448354#comment-17448354 ] Jark Wu commented on FLINK-25015: - Yes. I think it makes sense to dispaly query as the job name. > job name should not always be `collect` submitted by sql client > --- > > Key: FLINK-25015 > URL: https://issues.apache.org/jira/browse/FLINK-25015 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.14.0 >Reporter: KevinyhZou >Priority: Major > Attachments: image-2021-11-23-20-15-32-459.png, > image-2021-11-23-20-16-21-932.png > > > I use flink sql client to submitted different sql query to flink session > cluster, and the sql job name is always `collect`, as below > !image-2021-11-23-20-16-21-932.png! > which make no sence to users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] leonardBang merged pull request #17832: [FLINK-24958][docs]correct the example and link for temporal table fu…
leonardBang merged pull request #17832: URL: https://github.com/apache/flink/pull/17832 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-21415) JDBC connector should support to disable caching missing key
[ https://issues.apache.org/jira/browse/FLINK-21415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-21415. --- Assignee: (was: liwei li) Resolution: Duplicate > JDBC connector should support to disable caching missing key > > > Key: FLINK-21415 > URL: https://issues.apache.org/jira/browse/FLINK-21415 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC, Table SQL / Ecosystem >Affects Versions: 1.12.1 >Reporter: Shuai Xia >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > JDBC does not query data, and the cache will store an ArrayList without data. > We should add size judgment. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-24861) Support to disable caching missing key for lookup cache
[ https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-24861. --- Fix Version/s: 1.15.0 Resolution: Fixed Fixed in master: f8fdb6713a9264691a10afe3e6e9ff6bcdc86ae1 > Support to disable caching missing key for lookup cache > > > Key: FLINK-24861 > URL: https://issues.apache.org/jira/browse/FLINK-24861 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Affects Versions: 1.14.0 >Reporter: Gaurav Miglani >Assignee: Gaurav Miglani >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Ideally, in case of cache miss for a key, or with null value fetch for key, > key shouldn't be cached -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755684260 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java ## @@ -0,0 +1,32 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasPredictionCol; +import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WithParams; + +/** knn fit parameters. */ +public interface KnnParams +extends WithParams, +HasVectorColDefaultAsNull, +HasLabelCol, +HasFeatureColsDefaultAsNull, +HasPredictionCol { +/** + * @cn-name topK + * @cn topK + */ +Param K = new IntParam("k", "k", 10, ParamValidators.gt(0)); Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755684156 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java ## @@ -0,0 +1,32 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasPredictionCol; +import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WithParams; + +/** knn fit parameters. */ +public interface KnnParams +extends WithParams, +HasVectorColDefaultAsNull, +HasLabelCol, +HasFeatureColsDefaultAsNull, +HasPredictionCol { +/** + * @cn-name topK + * @cn topK + */ +Param K = new IntParam("k", "k", 10, ParamValidators.gt(0)); + +default Integer getK() { Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24861) Support to disable caching missing key for lookup cache
[ https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-24861: Summary: Support to disable caching missing key for lookup cache (was: Flink MySQL Look cache update for empty hit) > Support to disable caching missing key for lookup cache > > > Key: FLINK-24861 > URL: https://issues.apache.org/jira/browse/FLINK-24861 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Affects Versions: 1.14.0 >Reporter: Gaurav Miglani >Assignee: Gaurav Miglani >Priority: Major > Labels: pull-request-available > > Ideally, in case of cache miss for a key, or with null value fetch for key, > key shouldn't be cached -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] wuchong merged pull request #17754: [FLINK-24861][connector][jdbc] Fix false cache lookup for empty data
wuchong merged pull request #17754: URL: https://github.com/apache/flink/pull/17754 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25028) java.lang.OutOfMemoryError: Java heap space
[ https://issues.apache.org/jira/browse/FLINK-25028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448347#comment-17448347 ] Zhilong Hong commented on FLINK-25028: -- It seems that an OOM happens on the TaskExecutor. There are many reasons that could cause an OOM. You could run the {{jmap}} command in the pod/container to get the snapshot of the heap memory of that TaskExecutor. Then you could use the {{jhat}} command to analysis the snapshot and find out what is the largest part that occupies the heap memory. For more information, please see [the official doc of jmap|https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr014.html]. > java.lang.OutOfMemoryError: Java heap space > --- > > Key: FLINK-25028 > URL: https://issues.apache.org/jira/browse/FLINK-25028 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0 >Reporter: wangbaohua >Priority: Blocker > Attachments: error.txt > > > java.lang.OutOfMemoryError: Java heap space > at java.util.HashMap.resize(HashMap.java:703) ~[?:1.8.0_131] > at java.util.HashMap.putVal(HashMap.java:628) ~[?:1.8.0_131] > at java.util.HashMap.put(HashMap.java:611) ~[?:1.8.0_131] > at java.util.HashSet.add(HashSet.java:219) ~[?:1.8.0_131] > at > java.io.ObjectStreamClass$FieldReflector.(ObjectStreamClass.java:1945) > ~[?:1.8.0_131] > at java.io.ObjectStreamClass.getReflector(ObjectStreamClass.java:2193) > ~[?:1.8.0_131] > at java.io.ObjectStreamClass.(ObjectStreamClass.java:521) > ~[?:1.8.0_131] > at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:369) > ~[?:1.8.0_131] > at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) > ~[?:1.8.0_131] > at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:369) > ~[?:1.8.0_131] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) > ~[?:1.8.0_131] > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > ~[?:1.8.0_131] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > ~[?:1.8.0_131] > at java.io.ObjectOutputStream.access$300(ObjectOutputStream.java:162) > ~[?:1.8.0_131] > at > java.io.ObjectOutputStream$PutFieldImpl.writeFields(ObjectOutputStream.java:1707) > ~[?:1.8.0_131] > at java.io.ObjectOutputStream.writeFields(ObjectOutputStream.java:482) > ~[?:1.8.0_131] > at > java.util.concurrent.ConcurrentHashMap.writeObject(ConcurrentHashMap.java:1406) > ~[?:1.8.0_131] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_131] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_131] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_131] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131] > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > ~[?:1.8.0_131] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > ~[?:1.8.0_131] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > ~[?:1.8.0_131] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > ~[?:1.8.0_131] > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > ~[?:1.8.0_131] > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:62) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:51) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:54) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$retrievePayload$3(TaskExecutor.java:2425) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener$$Lambda$1020/78782846.apply(Unknown > Source) ~[?:?] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755682429 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.function.Function; + +/** Knn classification model fitted by estimator. */ +public class KnnModel implements Model, KnnParams { + +private static final long serialVersionUID = 1303892137143865652L; + +private static final String BROADCAST_STR = "broadcastModelKey"; +private static final int FASTDISTANCE_TYPE_INDEX = 0; +private static final int DATA_INDEX = 1; + +protected Map, Object> params = new HashMap<>(); + +private Table[] modelData; + +/** constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * constructor. + * + * @param params parameters for algorithm. + */ +public KnnModel(Map, Object> params) { +this.params = params; +} + +/** + * Set model data for knn prediction. + * + * @param modelData knn model. + * @return knn classification model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * get model data. + * + * @return list of tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * @param inputs a list of tables. + * @return result. + */ +@Override +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); + +Map> broadcastMap = new HashMap<>(1); +broadcastMap.put(BROADCAST_STR, model); +ResolvedSchema modelSchema = modelData[0].getResolvedSchema(); +DataType idType = + modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1); + +ResolvedSchema outputSchema = +getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), idType); + +DataType[] dataTypes =
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755682171 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.function.Function; + +/** Knn classification model fitted by estimator. */ +public class KnnModel implements Model, KnnParams { + +private static final long serialVersionUID = 1303892137143865652L; + +private static final String BROADCAST_STR = "broadcastModelKey"; Review comment: done ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755678624 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.function.Function; + +/** Knn classification model fitted by estimator. */ +public class KnnModel implements Model, KnnParams { + +private static final long serialVersionUID = 1303892137143865652L; + +private static final String BROADCAST_STR = "broadcastModelKey"; Review comment: OK, I will refine it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755678460 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.function.Function; + +/** Knn classification model fitted by estimator. */ +public class KnnModel implements Model, KnnParams { + +private static final long serialVersionUID = 1303892137143865652L; + +private static final String BROADCAST_STR = "broadcastModelKey"; +private static final int FASTDISTANCE_TYPE_INDEX = 0; +private static final int DATA_INDEX = 1; + +protected Map, Object> params = new HashMap<>(); + +private Table[] modelData; + +/** constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * constructor. + * + * @param params parameters for algorithm. + */ +public KnnModel(Map, Object> params) { +this.params = params; +} + +/** + * Set model data for knn prediction. + * + * @param modelData knn model. + * @return knn classification model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * get model data. + * + * @return list of tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * @param inputs a list of tables. + * @return result. + */ +@Override +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); + +Map> broadcastMap = new HashMap<>(1); +broadcastMap.put(BROADCAST_STR, model); +ResolvedSchema modelSchema = modelData[0].getResolvedSchema(); +DataType idType = + modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1); + +ResolvedSchema outputSchema = +getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), idType); + +DataType[] dataTypes =
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755678350 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.function.Function; + +/** Knn classification model fitted by estimator. */ +public class KnnModel implements Model, KnnParams { + +private static final long serialVersionUID = 1303892137143865652L; + +private static final String BROADCAST_STR = "broadcastModelKey"; +private static final int FASTDISTANCE_TYPE_INDEX = 0; +private static final int DATA_INDEX = 1; + +protected Map, Object> params = new HashMap<>(); + +private Table[] modelData; + +/** constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * constructor. + * + * @param params parameters for algorithm. + */ +public KnnModel(Map, Object> params) { +this.params = params; +} + +/** + * Set model data for knn prediction. + * + * @param modelData knn model. + * @return knn classification model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * get model data. + * + * @return list of tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * @param inputs a list of tables. + * @return result. + */ +@Override +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); + +Map> broadcastMap = new HashMap<>(1); +broadcastMap.put(BROADCAST_STR, model); +ResolvedSchema modelSchema = modelData[0].getResolvedSchema(); +DataType idType = + modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1); + +ResolvedSchema outputSchema = +getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), idType); + +DataType[] dataTypes =
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755677909 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnModel.java ## @@ -0,0 +1,594 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.core.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.linalg.DenseMatrix; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions; +import org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.function.Function; + +/** Knn classification model fitted by estimator. */ +public class KnnModel implements Model, KnnParams { + +private static final long serialVersionUID = 1303892137143865652L; + +private static final String BROADCAST_STR = "broadcastModelKey"; +private static final int FASTDISTANCE_TYPE_INDEX = 0; +private static final int DATA_INDEX = 1; + +protected Map, Object> params = new HashMap<>(); + +private Table[] modelData; + +/** constructor. */ +public KnnModel() { +ParamUtils.initializeMapWithDefaultValues(params, this); +} + +/** + * constructor. + * + * @param params parameters for algorithm. + */ +public KnnModel(Map, Object> params) { +this.params = params; +} + +/** + * Set model data for knn prediction. + * + * @param modelData knn model. + * @return knn classification model. + */ +@Override +public KnnModel setModelData(Table... modelData) { +this.modelData = modelData; +return this; +} + +/** + * get model data. + * + * @return list of tables. + */ +@Override +public Table[] getModelData() { +return modelData; +} + +/** + * @param inputs a list of tables. + * @return result. + */ +@Override +public Table[] transform(Table... inputs) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); +DataStream input = tEnv.toDataStream(inputs[0]); +DataStream model = tEnv.toDataStream(modelData[0]); + +Map> broadcastMap = new HashMap<>(1); +broadcastMap.put(BROADCAST_STR, model); +ResolvedSchema modelSchema = modelData[0].getResolvedSchema(); +DataType idType = + modelSchema.getColumnDataTypes().get(modelSchema.getColumnNames().size() - 1); + +ResolvedSchema outputSchema = +getOutputSchema(inputs[0].getResolvedSchema(), getParamMap(), idType); + +DataType[] dataTypes =
[jira] [Commented] (FLINK-24763) ParquetFileSystemITCase.testLimitableBulkFormat failed on Azure
[ https://issues.apache.org/jira/browse/FLINK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448343#comment-17448343 ] Jingsong Lee commented on FLINK-24763: -- Fixed in master: a5477938952863fb516a1a14d67808ecda047134 [~trohrmann] I merged the fix first, but there is no very certainty that the fix is complete, and we will keep watching. If the problem still recurs, I will disable the test for the time being and create a follow-up blocker ticket to re-enable it. > ParquetFileSystemITCase.testLimitableBulkFormat failed on Azure > --- > > Key: FLINK-24763 > URL: https://issues.apache.org/jira/browse/FLINK-24763 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, > ORC, SequenceFile) >Affects Versions: 1.15.0 >Reporter: Till Rohrmann >Assignee: Jingsong Lee >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.15.0 > > > The test {{ParquetFileSystemITCase.testLimitableBulkFormat}} fails with > {code} > 2021-11-03T22:10:11.5106075Z Nov 03 22:10:11 [ERROR] > testLimitableBulkFormat[false] Time elapsed: 9.177 s <<< ERROR! > 2021-11-03T22:10:11.5106643Z Nov 03 22:10:11 java.lang.RuntimeException: > Failed to fetch next result > 2021-11-03T22:10:11.5107213Z Nov 03 22:10:11 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > 2021-11-03T22:10:11.5111034Z Nov 03 22:10:11 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > 2021-11-03T22:10:11.5112190Z Nov 03 22:10:11 at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:188) > 2021-11-03T22:10:11.5112892Z Nov 03 22:10:11 at > java.util.Iterator.forEachRemaining(Iterator.java:115) > 2021-11-03T22:10:11.5113393Z Nov 03 22:10:11 at > org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109) > 2021-11-03T22:10:11.5114157Z Nov 03 22:10:11 at > org.apache.flink.formats.parquet.ParquetFileSystemITCase.testLimitableBulkFormat(ParquetFileSystemITCase.java:128) > 2021-11-03T22:10:11.5114951Z Nov 03 22:10:11 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2021-11-03T22:10:11.5115568Z Nov 03 22:10:11 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2021-11-03T22:10:11.5116115Z Nov 03 22:10:11 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2021-11-03T22:10:11.5116591Z Nov 03 22:10:11 at > java.lang.reflect.Method.invoke(Method.java:498) > 2021-11-03T22:10:11.5117088Z Nov 03 22:10:11 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2021-11-03T22:10:11.5117807Z Nov 03 22:10:11 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2021-11-03T22:10:11.5118821Z Nov 03 22:10:11 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2021-11-03T22:10:11.5119417Z Nov 03 22:10:11 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2021-11-03T22:10:11.5119944Z Nov 03 22:10:11 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2021-11-03T22:10:11.5120427Z Nov 03 22:10:11 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2021-11-03T22:10:11.5120919Z Nov 03 22:10:11 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2021-11-03T22:10:11.5121571Z Nov 03 22:10:11 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2021-11-03T22:10:11.5122526Z Nov 03 22:10:11 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2021-11-03T22:10:11.5123245Z Nov 03 22:10:11 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2021-11-03T22:10:11.5123804Z Nov 03 22:10:11 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2021-11-03T22:10:11.5124314Z Nov 03 22:10:11 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2021-11-03T22:10:11.5124806Z Nov 03 22:10:11 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2021-11-03T22:10:11.5125313Z Nov 03 22:10:11 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2021-11-03T22:10:11.5125810Z Nov 03 22:10:11 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2021-11-03T22:10:11.5126281Z Nov 03 22:10:11 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755677169 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasFeatureColsDefaultAsNull.java ## @@ -0,0 +1,28 @@ +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WithParams; + +/** Params of the names of the feature columns used for training in the input table. */ +public interface HasFeatureColsDefaultAsNull extends WithParams { +/** + * @cn-name 特征列名数组 + * @cn 特征列名数组,默认全选 + */ +Param FEATURE_COLS = Review comment: OK, I will refine it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755676992 ## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/knn/KnnTest.java ## @@ -0,0 +1,172 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.api.core.Pipeline; +import org.apache.flink.ml.api.core.Stage; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** knn algorithm test. */ +public class KnnTest { +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainData; + +List trainArray = +new ArrayList<>( +Arrays.asList( +Row.of("f", "2.0 3.0"), +Row.of("f", "2.1 3.1"), +Row.of("m", "200.1 300.1"), +Row.of("m", "200.2 300.2"), +Row.of("m", "200.3 300.3"), +Row.of("m", "200.4 300.4"), +Row.of("m", "200.4 300.4"), +Row.of("m", "200.6 300.6"), +Row.of("f", "2.1 3.1"), +Row.of("f", "2.1 3.1"), +Row.of("f", "2.1 3.1"), +Row.of("f", "2.1 3.1"), +Row.of("f", "2.3 3.2"), +Row.of("f", "2.3 3.2"), +Row.of("c", "2.8 3.2"), +Row.of("d", "300. 3.2"), +Row.of("f", "2.2 3.2"), +Row.of("e", "2.4 3.2"), +Row.of("e", "2.5 3.2"), +Row.of("e", "2.5 3.2"), +Row.of("f", "2.1 3.1"))); + +List testArray = +new ArrayList<>(Arrays.asList(Row.of("e", "4.0 4.1"), Row.of("m", "300 42"))); + +private Table testData; + Review comment: OK, I will do it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] weibozhao commented on a change in pull request #24: [Flink 24557] - Add knn algorithm to flink-ml
weibozhao commented on a change in pull request #24: URL: https://github.com/apache/flink-ml/pull/24#discussion_r755676909 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/knn/KnnParams.java ## @@ -0,0 +1,32 @@ +package org.apache.flink.ml.classification.knn; + +import org.apache.flink.ml.common.param.HasFeatureColsDefaultAsNull; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasPredictionCol; +import org.apache.flink.ml.common.param.HasVectorColDefaultAsNull; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WithParams; + +/** knn fit parameters. */ +public interface KnnParams +extends WithParams, +HasVectorColDefaultAsNull, +HasLabelCol, +HasFeatureColsDefaultAsNull, +HasPredictionCol { +/** + * @cn-name topK + * @cn topK + */ +Param K = new IntParam("k", "k", 10, ParamValidators.gt(0)); + +default Integer getK() { Review comment: K can be get in tow parttern: getK() or params.get(KnnParams.K) algorithm's implementation is no problem. I just use the second pattern, maybe I need change the patten later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi merged pull request #17792: [FLINK-24763][fs-connector] LimitableReader should swallow exception when reached limit
JingsongLi merged pull request #17792: URL: https://github.com/apache/flink/pull/17792 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org