[jira] [Resolved] (FLINK-32810) Improve managed memory usage in ListStateWithCache
[ https://issues.apache.org/jira/browse/FLINK-32810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-32810. --- Fix Version/s: ml-2.4.0 Assignee: Fan Hong Resolution: Fixed > Improve managed memory usage in ListStateWithCache > -- > > Key: FLINK-32810 > URL: https://issues.apache.org/jira/browse/FLINK-32810 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Fan Hong >Assignee: Fan Hong >Priority: Major > Labels: pull-request-available > Fix For: ml-2.4.0 > > > Right now, by default, an instance of `ListStateWithCache` uses up all the > managed memory allocated for`ManagedMemoryUseCase.OPERATOR`. > It could bring some bugs in some situations, for example, when there exist > more than one `ListStateWithCache`s in a single operator, or there are other > places using managed memory of `ManagedMemoryUseCase.OPERATOR`. > > An approach to resolve such cases is to let the developer be aware about the > usage of managed memory of `ManagedMemoryUseCase.OPERATOR`, instead of > implicitly use up all of it. Therefore, I think it is better to add a > parameters `fraction` to specify how much memory is used in the > `ListStateWithCache`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32889) BinaryClassificationEvaluator gives wrong weighted AUC value
[ https://issues.apache.org/jira/browse/FLINK-32889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-32889. --- Fix Version/s: ml-2.4.0 Assignee: Fan Hong Resolution: Fixed > BinaryClassificationEvaluator gives wrong weighted AUC value > > > Key: FLINK-32889 > URL: https://issues.apache.org/jira/browse/FLINK-32889 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Fan Hong >Assignee: Fan Hong >Priority: Major > Labels: pull-request-available > Fix For: ml-2.4.0 > > > BinaryClassificationEvaluator gives wrong AUC value when a weight column > provided. > Here is an case from the unit test. The (score, label, weight) of data are: > {code:java} > (0.9, 1.0, 0.8), > (0.9, 1.0, 0.7), > (0.9, 1.0, 0.5), > (0.75, 0.0, 1.2), > (0.6, 0.0, 1.3), > (0.9, 1.0, 1.5), > (0.9, 1.0, 1.4), > (0.4, 0.0, 0.3), > (0.3, 0.0, 0.5), > (0.9, 1.0, 1.9), > (0.2, 0.0, 1.2), > (0.1, 1.0, 1.0) > {code} > PySpark and scikit-learn gives a AUC score of 0.87179, while Flink ML > implementation gives 0.891168. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32889) BinaryClassificationEvaluator gives wrong weighted AUC value
[ https://issues.apache.org/jira/browse/FLINK-32889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758393#comment-17758393 ] Zhipeng Zhang commented on FLINK-32889: --- Solved on master via 5619c3b8591b220e78a0a792c1f940e06149c8f0. > BinaryClassificationEvaluator gives wrong weighted AUC value > > > Key: FLINK-32889 > URL: https://issues.apache.org/jira/browse/FLINK-32889 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Fan Hong >Priority: Major > Labels: pull-request-available > > BinaryClassificationEvaluator gives wrong AUC value when a weight column > provided. > Here is an case from the unit test. The (score, label, weight) of data are: > {code:java} > (0.9, 1.0, 0.8), > (0.9, 1.0, 0.7), > (0.9, 1.0, 0.5), > (0.75, 0.0, 1.2), > (0.6, 0.0, 1.3), > (0.9, 1.0, 1.5), > (0.9, 1.0, 1.4), > (0.4, 0.0, 0.3), > (0.3, 0.0, 0.5), > (0.9, 1.0, 1.9), > (0.2, 0.0, 1.2), > (0.1, 1.0, 1.0) > {code} > PySpark and scikit-learn gives a AUC score of 0.87179, while Flink ML > implementation gives 0.891168. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32598) Spill data from feedback edge to disk to avoid possible OOM
[ https://issues.apache.org/jira/browse/FLINK-32598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-32598: -- Description: In Flink ML, we use feedback edge to implement the iteration module. Suppose the job topology is like `OpA -> HeadOperator -> OpB -> TailOperator`, then the basic process of each iteration is as follows: * At the first iteration, HeadOperator takes the input from OpA and forward it to OpB. * Later, OpB consumes the input from HeadOperator and forward the output to TailOperator. * Finally, TailOperator puts the records into a memory message queue and HeadOperator consumes the message queue. When the output from OpB contains many records and these records cannot be consumed soon, the message queue would grow big and finally lead to OOM. > Spill data from feedback edge to disk to avoid possible OOM > --- > > Key: FLINK-32598 > URL: https://issues.apache.org/jira/browse/FLINK-32598 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.4.0 > > > In Flink ML, we use feedback edge to implement the iteration module. Suppose > the job topology is like `OpA -> HeadOperator -> OpB -> TailOperator`, then > the basic process of each iteration is as follows: > * At the first iteration, HeadOperator takes the input from OpA and forward > it to OpB. > * Later, OpB consumes the input from HeadOperator and forward the output to > TailOperator. > * Finally, TailOperator puts the records into a memory message queue and > HeadOperator consumes the message queue. > When the output from OpB contains many records and these records cannot be > consumed soon, the message queue would grow big and finally lead to OOM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32598) Spill data from feedback edge to disk to avoid possible OOM
Zhipeng Zhang created FLINK-32598: - Summary: Spill data from feedback edge to disk to avoid possible OOM Key: FLINK-32598 URL: https://issues.apache.org/jira/browse/FLINK-32598 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Zhipeng Zhang Fix For: ml-2.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31910) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
[ https://issues.apache.org/jira/browse/FLINK-31910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang closed FLINK-31910. - Resolution: Invalid > Using BroadcastUtils#withBroadcast in iteration perround mode got stuck > --- > > Key: FLINK-31910 > URL: https://issues.apache.org/jira/browse/FLINK-31910 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Zhipeng Zhang >Priority: Major > > Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. > >From the thread dump, it seems that the head operator and criteria node are > stuck and waiting for a mail. > > {code:java} > "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 > os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition > [0x700013db6000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000747a83270> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown > Source) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) {code} > > The demo for this bug could be found here: > https://github.com/zhipeng93/flink-ml/tree/FLINK-31910-demo-case -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32293) Support vector with long index
[ https://issues.apache.org/jira/browse/FLINK-32293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-32293: -- Component/s: Library / Machine Learning > Support vector with long index > -- > > Key: FLINK-32293 > URL: https://issues.apache.org/jira/browse/FLINK-32293 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > > Currently in Flink ML, we only support sparse and dense vector with `int` as > index and `double` as value. > > However, there are real-world cases that the index of a vector could exceed > the range of `INT.MAX`. Thus we need to support vector with `long` index. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32293) Support vector with long index
Zhipeng Zhang created FLINK-32293: - Summary: Support vector with long index Key: FLINK-32293 URL: https://issues.apache.org/jira/browse/FLINK-32293 Project: Flink Issue Type: New Feature Reporter: Zhipeng Zhang Currently in Flink ML, we only support sparse and dense vector with `int` as index and `double` as value. However, there are real-world cases that the index of a vector could exceed the range of `INT.MAX`. Thus we need to support vector with `long` index. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple
Zhipeng Zhang created FLINK-32292: - Summary: TableUtils.getRowTypeInfo fails to get type information of Tuple Key: FLINK-32292 URL: https://issues.apache.org/jira/browse/FLINK-32292 Project: Flink Issue Type: Bug Components: Library / Machine Learning Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records
[ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31901: -- Description: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at time: 1682319149462 processed cached records, cnt: 1 at time: 1682319149569 processed cached records, cnt: 2 at time: 1682319149614 processed cached records, cnt: 3 at time: 1682319149655 processed cached records, cnt: 4 at time: 1682319149702 processed cached records, cnt: 5 at time: 1682319149746 processed cached records, cnt: 6 at time: 1682319149781 processed cached records, cnt: 7 at time: 1682319149891 processed cached records, cnt: 8 at time: 1682319150011 processed cached records, cnt: 9 at time: 1682319150116 processed cached records, cnt: 10 at time: 1682319150199 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at time: 1682319150378 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at time: 1682319150606 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at time: 1682319150704 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at time: 1682319150785 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at time: 1682319150859 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at time: 1682319150935 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at time: 1682319151007{code} We can find that from line#2 to line#11, there is no checkpoints and the barriers are blocked until all cached elements are processed, which takes ~600ms and much longer than checkpoint interval (i.e., 100ms) [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case was: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at time: 1682319149462 processed cached records, cnt: 1 at time: 1682319149569 processed cached records, cnt: 2 at time: 1682319149614 processed cached records, cnt: 3 at time: 1682319149655 processed cached records, cnt: 4 at time: 1682319149702 processed cached records, cnt: 5 at time: 1682319149746 processed cached records, cnt: 6 at time: 1682319149781 processed cached records, cnt: 7 at time: 1682319149891 processed cached records, cnt: 8 at time: 1682319150011 processed cached records, cnt: 9 at time: 1682319150116 processed cached records, cnt: 10 at time: 1682319150199 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at time: 1682319150378 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at time: 1682319150606 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at time: 1682319150704 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at time: 1682319150785 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at time: 1682319150859 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at time: 1682319150935 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at time: 1682319151007{code} We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed, which takes ~600ms and much longer than checkpoint interval (i.e., 100ms) [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case > AbstractBroadcastWrapperOperator should not block checkpoint barriers when > processing cached records > > >
[jira] [Closed] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream
[ https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang closed FLINK-31903. - Resolution: Not A Bug > Caching records fails in BroadcastUtils#withBroadcastStream > --- > > Key: FLINK-31903 > URL: https://issues.apache.org/jira/browse/FLINK-31903 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Zhipeng Zhang >Priority: Major > > When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, > it throws exception as follows: > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint > tolerable failure threshold. > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > It seems that the bug comes from caching too many records when calling > AbstractBroadcastWrapperOperator#snapshot. > > The failed case could be found here: > [https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream
[ https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721753#comment-17721753 ] Zhipeng Zhang commented on FLINK-31903: --- The root cause is that using memory as statebackend has limited state size. So this is not a bug. " java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5365814, maxSize=5242880. Consider using a different checkpoint storage, like the FileSystemCheckpointStorage.". > Caching records fails in BroadcastUtils#withBroadcastStream > --- > > Key: FLINK-31903 > URL: https://issues.apache.org/jira/browse/FLINK-31903 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Zhipeng Zhang >Priority: Major > > When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, > it throws exception as follows: > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint > tolerable failure threshold. > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > It seems that the bug comes from caching too many records when calling > AbstractBroadcastWrapperOperator#snapshot. > > The failed case could be found here: > [https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31909) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
[ https://issues.apache.org/jira/browse/FLINK-31909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-31909. --- Resolution: Duplicate Duplicate with https://issues.apache.org/jira/browse/FLINK-31910. > Using BroadcastUtils#withBroadcast in iteration perround mode got stuck > --- > > Key: FLINK-31909 > URL: https://issues.apache.org/jira/browse/FLINK-31909 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.3.0 > > > Using BroadcastUtils#withBroadcastStream in iterations in per round mode > could possibly lead to stuck. > > It seems that the there is a task waiting for the mail from the mailbox. > > {code:java} > 793 "tail-map-head-Parallel Collection Source (1/1)#0" #200 prio=5 > os_prio=31 tid=0x7faabb571800 nid=0x18c03 waiting on condition > [0x700013aae000] > 793 java.lang.Thread.State: TIMED_WAITING (parking) > 794 at sun.misc.Unsafe.park(Native Method) > 795 - parking to wait for <0x000747805568> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > 796 at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > 797 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) > 798 at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) > 799 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335) > 800 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > 801 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > 802 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > 803 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > 804 at > org.apache.flink.runtime.taskmanager.Task$$Lambda$1430/1226027100.run(Unknown > Source) > 805 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > 806 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > 807 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > 808 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > 809 at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31910) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
[ https://issues.apache.org/jira/browse/FLINK-31910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31910: -- Description: Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. From the thread dump, it seems that the head operator and criteria node are stuck and waiting for a mail. {code:java} "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition [0x700013db6000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000747a83270> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown Source) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) {code} The demo for this bug could be found here: https://github.com/zhipeng93/flink-ml/tree/FLINK-31910-demo-case was: Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. From the thread dump, it seems that the head operator and criteria node are stuck and waiting for a mail. {code:java} "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition [0x700013db6000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000747a83270> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown Source) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) {code} The demo for this bug could be found here: > Using BroadcastUtils#withBroadcast in iteration perround mode got stuck > --- > > Key: FLINK-31910 > URL: https://issues.apache.org/jira/browse/FLINK-31910 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Zhipeng Zhang >Priority: Major > > Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. > >From the thread dump, it seems that the head operator and criteria node are > stuck and waiting for a mail. > > {code:java} > "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 > os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on
[jira] [Created] (FLINK-31910) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
Zhipeng Zhang created FLINK-31910: - Summary: Using BroadcastUtils#withBroadcast in iteration perround mode got stuck Key: FLINK-31910 URL: https://issues.apache.org/jira/browse/FLINK-31910 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.3.0 Reporter: Zhipeng Zhang Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. From the thread dump, it seems that the head operator and criteria node are stuck and waiting for a mail. {code:java} "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition [0x700013db6000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000747a83270> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown Source) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) {code} The demo for this bug could be found here: -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31909) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
Zhipeng Zhang created FLINK-31909: - Summary: Using BroadcastUtils#withBroadcast in iteration perround mode got stuck Key: FLINK-31909 URL: https://issues.apache.org/jira/browse/FLINK-31909 Project: Flink Issue Type: Bug Components: Library / Machine Learning Reporter: Zhipeng Zhang Fix For: ml-2.3.0 Using BroadcastUtils#withBroadcastStream in iterations in per round mode could possibly lead to stuck. It seems that the there is a task waiting for the mail from the mailbox. {code:java} 793 "tail-map-head-Parallel Collection Source (1/1)#0" #200 prio=5 os_prio=31 tid=0x7faabb571800 nid=0x18c03 waiting on condition [0x700013aae000] 793 java.lang.Thread.State: TIMED_WAITING (parking) 794 at sun.misc.Unsafe.park(Native Method) 795 - parking to wait for <0x000747805568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 796 at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) 797 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) 798 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) 799 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335) 800 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) 801 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) 802 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) 803 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 804 at org.apache.flink.runtime.taskmanager.Task$$Lambda$1430/1226027100.run(Unknown Source) 805 at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) 806 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 807 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 808 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 809 at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream
[ https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31903: -- Description: When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it throws exception as follows: {code:java} Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} It seems that the bug comes from caching too many records when calling AbstractBroadcastWrapperOperator#snapshot. The failed case could be found here: [https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case] was: When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it leads to exception as follows: {code:java} Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} It seems that the bug comes from caching too many records when calling AbstractBroadcastWrapperOperator#snapshot. The failed case could be found here: https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case > Caching records fails in BroadcastUtils#withBroadcastStream > --- > > Key: FLINK-31903 > URL: https://issues.apache.org/jira/browse/FLINK-31903 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Zhipeng Zhang >Priority: Major > > When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, > it throws exception as follows: > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint > tolerable failure threshold. > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at >
[jira] [Updated] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream
[ https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31903: -- Description: When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it leads to exception as follows: {code:java} Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} It seems that the bug comes from caching too many records when calling AbstractBroadcastWrapperOperator#snapshot. The failed case could be found here: https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case was: When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it leads to exception as follows: {code:java} Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} It seems that the bug comes from caching too many records when calling AbstractBroadcastWrapperOperator#snapshot. > Caching records fails in BroadcastUtils#withBroadcastStream > --- > > Key: FLINK-31903 > URL: https://issues.apache.org/jira/browse/FLINK-31903 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.3.0 >Reporter: Zhipeng Zhang >Priority: Major > > When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, > it leads to exception as follows: > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint > tolerable failure threshold. > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at >
[jira] [Created] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream
Zhipeng Zhang created FLINK-31903: - Summary: Caching records fails in BroadcastUtils#withBroadcastStream Key: FLINK-31903 URL: https://issues.apache.org/jira/browse/FLINK-31903 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.3.0 Reporter: Zhipeng Zhang When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it leads to exception as follows: {code:java} Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} It seems that the bug comes from caching too many records when calling AbstractBroadcastWrapperOperator#snapshot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records
[ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31901: -- Description: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at time: 1682319149462 processed cached records, cnt: 1 at time: 1682319149569 processed cached records, cnt: 2 at time: 1682319149614 processed cached records, cnt: 3 at time: 1682319149655 processed cached records, cnt: 4 at time: 1682319149702 processed cached records, cnt: 5 at time: 1682319149746 processed cached records, cnt: 6 at time: 1682319149781 processed cached records, cnt: 7 at time: 1682319149891 processed cached records, cnt: 8 at time: 1682319150011 processed cached records, cnt: 9 at time: 1682319150116 processed cached records, cnt: 10 at time: 1682319150199 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at time: 1682319150378 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at time: 1682319150606 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at time: 1682319150704 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at time: 1682319150785 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at time: 1682319150859 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at time: 1682319150935 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at time: 1682319151007{code} We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed, which takes ~600ms and much longer than checkpoint interval (i.e., 100ms) [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case was: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 processed cached records, cnt: 1 processed cached records, cnt: 2 processed cached records, cnt: 3 processed cached records, cnt: 4 processed cached records, cnt: 5 processed cached records, cnt: 6 processed cached records, cnt: 7 processed cached records, cnt: 8 processed cached records, cnt: 9 processed cached records, cnt: 10 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code} We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed. [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case > AbstractBroadcastWrapperOperator should not block checkpoint barriers when > processing cached records > > > Key: FLINK-31901 > URL: https://issues.apache.org/jira/browse/FLINK-31901 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.3.0 > > > Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast > input until the broadcast inputs are all processed. After the broadcast > variables are ready, we first process the cached records and then continue to > process the newly arrived
[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records
[ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31901: -- Description: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 processed cached records, cnt: 1 processed cached records, cnt: 2 processed cached records, cnt: 3 processed cached records, cnt: 4 processed cached records, cnt: 5 processed cached records, cnt: 6 processed cached records, cnt: 7 processed cached records, cnt: 8 processed cached records, cnt: 9 processed cached records, cnt: 10 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code} We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed. [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case was: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. > AbstractBroadcastWrapperOperator should not block checkpoint barriers when > processing cached records > > > Key: FLINK-31901 > URL: https://issues.apache.org/jira/browse/FLINK-31901 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.3.0 > > > Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast > input until the broadcast inputs are all processed. After the broadcast > variables are ready, we first process the cached records and then continue to > process the newly arrived records. > > Processing cached elements is invoked via `Input#processElement` and > `Input#processWatermark`. However, processing cached element may take a long > time since there may be many cached records, which could potentially block > the checkpoint barrier. > > If we run the code snippet here[1], we are supposed to get logs as follows. > {code:java} > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 > processed cached records, cnt: 1 > processed cached records, cnt: 2 > processed cached records, cnt: 3 > processed cached records, cnt: 4 > processed cached records, cnt: 5 > processed cached records, cnt: 6 > processed cached records, cnt: 7 > processed cached records, cnt: 8 > processed cached records, cnt: 9 > processed cached records, cnt: 10 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code} > > We can find that from line#3 to line#12, there is no checkpoints and the > barriers are blocked until all cached elements are processed. > > [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records
Zhipeng Zhang created FLINK-31901: - Summary: AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records Key: FLINK-31901 URL: https://issues.apache.org/jira/browse/FLINK-31901 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Zhipeng Zhang Fix For: ml-2.3.0 Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31173) Fix several bugs in flink-ml-iteration module
[ https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31173: -- Description: In flink-ml-iteration, there are several bugs as follows: # TailOperator should have one input operator. We have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. # ProxyOperatorStateBackend does not correctly initialize the state descriptor. was: In flink-ml-iteration, there are several bugs as follows: # TailOperator should have one input operator. We have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. # ReplayOperator should replay the records when it receives max epochwatermark. Currently ReplayOperator does not replay the records when it receives the max epochwatermark. However, it is inconsistent with the HeadOperator. As HeadOperator will always forward the record to downstreams. # ProxyOperatorStateBackend does not correctly initialize the state descriptor. > Fix several bugs in flink-ml-iteration module > - > > Key: FLINK-31173 > URL: https://issues.apache.org/jira/browse/FLINK-31173 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.3.0 > > > In flink-ml-iteration, there are several bugs as follows: > # TailOperator should have one input operator. We have added a Tail operator > to increment the epoch watermark at each iteration. We have made an > assumption that each Tail operator have only one input and did not align the > epoch watermarks from different inputs. This assumption might not be true if > the input is an `union`. > # ProxyOperatorStateBackend does not correctly initialize the state > descriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
[ https://issues.apache.org/jira/browse/FLINK-31374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31374: - Assignee: Jiang Xin (was: jiangxin li) > ProxyStreamPartitioner should implement ConfigurableStreamPartitioner > - > > Key: FLINK-31374 > URL: https://issues.apache.org/jira/browse/FLINK-31374 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > > In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap > StreamPartitioner to deal with records in iterations. > > However, it did not implement ConfigurableStreamPartitioner interface. Thus > that maxParallelism information is lost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
[ https://issues.apache.org/jira/browse/FLINK-31374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31374: - Assignee: jiangxin li > ProxyStreamPartitioner should implement ConfigurableStreamPartitioner > - > > Key: FLINK-31374 > URL: https://issues.apache.org/jira/browse/FLINK-31374 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: jiangxin li >Priority: Major > Labels: pull-request-available > > In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap > StreamPartitioner to deal with records in iterations. > > However, it did not implement ConfigurableStreamPartitioner interface. Thus > that maxParallelism information is lost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
[ https://issues.apache.org/jira/browse/FLINK-31374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-31374. --- Resolution: Fixed Resolved on master via 990337bf5a0a23b08be85c475043a047703772c8 > ProxyStreamPartitioner should implement ConfigurableStreamPartitioner > - > > Key: FLINK-31374 > URL: https://issues.apache.org/jira/browse/FLINK-31374 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > > In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap > StreamPartitioner to deal with records in iterations. > > However, it did not implement ConfigurableStreamPartitioner interface. Thus > that maxParallelism information is lost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31732) flink-ml-uber module should include statefun as a dependency
[ https://issues.apache.org/jira/browse/FLINK-31732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31732: -- Component/s: Library / Machine Learning > flink-ml-uber module should include statefun as a dependency > > > Key: FLINK-31732 > URL: https://issues.apache.org/jira/browse/FLINK-31732 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31732) flink-ml-uber module should include statefun as a dependency
[ https://issues.apache.org/jira/browse/FLINK-31732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang closed FLINK-31732. - Resolution: Won't Fix We leave state-fun as a third party dependency for now. > flink-ml-uber module should include statefun as a dependency > > > Key: FLINK-31732 > URL: https://issues.apache.org/jira/browse/FLINK-31732 > Project: Flink > Issue Type: Improvement >Reporter: Zhipeng Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31732) flink-ml-uber module should include statefun as a dependency
Zhipeng Zhang created FLINK-31732: - Summary: flink-ml-uber module should include statefun as a dependency Key: FLINK-31732 URL: https://issues.apache.org/jira/browse/FLINK-31732 Project: Flink Issue Type: Improvement Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31623) Fix DataStreamUtils#sample with approximate uniform sampling
[ https://issues.apache.org/jira/browse/FLINK-31623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31623: -- Summary: Fix DataStreamUtils#sample with approximate uniform sampling (was: Change to uniform sampling in DataStreamUtils#sample method) > Fix DataStreamUtils#sample with approximate uniform sampling > > > Key: FLINK-31623 > URL: https://issues.apache.org/jira/browse/FLINK-31623 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Fan Hong >Priority: Major > Labels: pull-request-available > > Current implementation employs two-level sampling method. > However, when data instances are imbalanced distributed among partitions > (subtasks), the probabilities of instances to be sampled are different in > different partitions (subtasks), i.e., not a uniform sampling. > > In addition, one side-effect of current implementation is: one subtask has a > memory footprint of `2 * numSamples * sizeof(element)`, which could cause > unexpected OOM in some situations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31623) Fix DataStreamUtils#sample with approximate uniform sampling
[ https://issues.apache.org/jira/browse/FLINK-31623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-31623. --- Resolution: Fixed Resolved on master via fe338b194b73fd51218f4d842fa7b0065fb76c56 > Fix DataStreamUtils#sample with approximate uniform sampling > > > Key: FLINK-31623 > URL: https://issues.apache.org/jira/browse/FLINK-31623 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Fan Hong >Priority: Major > Labels: pull-request-available > > Current implementation employs two-level sampling method. > However, when data instances are imbalanced distributed among partitions > (subtasks), the probabilities of instances to be sampled are different in > different partitions (subtasks), i.e., not a uniform sampling. > > In addition, one side-effect of current implementation is: one subtask has a > memory footprint of `2 * numSamples * sizeof(element)`, which could cause > unexpected OOM in some situations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31189) Allow special handle of less frequent values in StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-31189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-31189. --- Resolution: Fixed Resolved via 560532d99c4330949d4da2c94d0e5228bdfbc9cd on master > Allow special handle of less frequent values in StringIndexer > - > > Key: FLINK-31189 > URL: https://issues.apache.org/jira/browse/FLINK-31189 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Fan Hong >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > > Real-world datasets often contain categorical features with millions of > distinct values, some of which may only appear a few times. To maximize the > performance of certain algorithms, it is important to treat these less > frequent values properly. A popular approach is to put them to a special > index, as is done in sklearn's OneHotEncoder [1]. > > [1] > https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31189) Allow special handle of less frequent values in StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-31189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31189: - Assignee: Zhipeng Zhang > Allow special handle of less frequent values in StringIndexer > - > > Key: FLINK-31189 > URL: https://issues.apache.org/jira/browse/FLINK-31189 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Fan Hong >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > > Real-world datasets often contain categorical features with millions of > distinct values, some of which may only appear a few times. To maximize the > performance of certain algorithms, it is important to treat these less > frequent values properly. A popular approach is to put them to a special > index, as is done in sklearn's OneHotEncoder [1]. > > [1] > https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701452#comment-17701452 ] Zhipeng Zhang commented on FLINK-31486: --- Is this similar to this one [1]? [1] https://issues.apache.org/jira/browse/FLINK-31255 > Using KeySelector in IterationBody causes ClassNotFoundException > > > Key: FLINK-31486 > URL: https://issues.apache.org/jira/browse/FLINK-31486 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > When we use CoGroup along with KeySelector in an IterationBody, the following > exception occurs. > {code:java} > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Could not instantiate state partitioner. at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) > at > org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 > of type org.apache.flink.api.java.functions.KeySelector in instance of > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) > ... 17 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31410) ListStateWithCache Should support incremental snapshot
[ https://issues.apache.org/jira/browse/FLINK-31410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31410: -- Description: In Flink ML, we use ListStateWithCache [2] to enable caching data in memory and filesystem. However, it does not support incremental snapshot now — It writes all the data to checkpoint stream whenever calling snapshot [1], which could be inefficient. [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116] [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java] was: In Flink ML, we used ListStateWithCache [2] to enable caching data in memory and filesystem. However, it does not support incremental snapshot now — It writes all the data to checkpoint stream when calling snapshot [1], which could be inefficient. [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116] [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java] > ListStateWithCache Should support incremental snapshot > -- > > Key: FLINK-31410 > URL: https://issues.apache.org/jira/browse/FLINK-31410 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > In Flink ML, we use ListStateWithCache [2] to enable caching data in memory > and filesystem. However, it does not support incremental snapshot now — It > writes all the data to checkpoint stream whenever calling snapshot [1], which > could be inefficient. > > > [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116] > > [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31410) ListStateWithCache Should support incremental snapshot
Zhipeng Zhang created FLINK-31410: - Summary: ListStateWithCache Should support incremental snapshot Key: FLINK-31410 URL: https://issues.apache.org/jira/browse/FLINK-31410 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang In Flink ML, we used ListStateWithCache [2] to enable caching data in memory and filesystem. However, it does not support incremental snapshot now — It writes all the data to checkpoint stream when calling snapshot [1], which could be inefficient. [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116] [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31410) ListStateWithCache Should support incremental snapshot
[ https://issues.apache.org/jira/browse/FLINK-31410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31410: -- Issue Type: Improvement (was: Bug) > ListStateWithCache Should support incremental snapshot > -- > > Key: FLINK-31410 > URL: https://issues.apache.org/jira/browse/FLINK-31410 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > In Flink ML, we used ListStateWithCache [2] to enable caching data in memory > and filesystem. However, it does not support incremental snapshot now — It > writes all the data to checkpoint stream when calling snapshot [1], which > could be inefficient. > > > [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116] > > [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark
[ https://issues.apache.org/jira/browse/FLINK-31373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698766#comment-17698766 ] Zhipeng Zhang commented on FLINK-31373: --- As discussed with [~gaoyunhaii] offline, we agree that the watermark is not correctly processed in iteration module. To avoid the above cases for now, we plan to add a java doc to explain that `flink-m-iteration` module cannot deal with watermarks correctly. We will leave it as a TODO here. > PerRoundWrapperOperator should carry epoch information in watermark > --- > > Key: FLINK-31373 > URL: https://issues.apache.org/jira/browse/FLINK-31373 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > Currently we use PerRoundWrapperOperator to wrap the normal flink operators > such that they can be used in iterations. > We already contained the epoch information in each record so that we know > which iteration each record belongs to. > However, there is no epoch information when the stream element is a > watermark. This works in most cases, but fail to address the following use > case: > - In DataStreamUtils#withBroadcast, we will cache the elements (including > watermarks) from non-broadcast inputs until the broadcast variables are > ready. When the broadcast variables are ready, once we receive a stream > element we will process the cached elements first. If the received element is > a watermark, the current implementation of iteration module fails > (ProxyOutput#collect throws NPE) since there is no epoch information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31029) KBinsDiscretizer gives wrong bin edges in 'quantile' strategy when input data contains only 2 distinct values
[ https://issues.apache.org/jira/browse/FLINK-31029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31029: - Assignee: Zhipeng Zhang > KBinsDiscretizer gives wrong bin edges in 'quantile' strategy when input data > contains only 2 distinct values > - > > Key: FLINK-31029 > URL: https://issues.apache.org/jira/browse/FLINK-31029 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Fan Hong >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > > When one input column contains only 2 distinct values and their counts are > same, KBinsDiscretizer transforms this column to all 0s using `quantile` > strategy. An example of such column is `[0, 0, 0, 1, 1, 1]`. > When the 2 distinct values have different counts, the transformed values are > also all 0s, which cannot distinguish them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
Zhipeng Zhang created FLINK-31374: - Summary: ProxyStreamPartitioner should implement ConfigurableStreamPartitioner Key: FLINK-31374 URL: https://issues.apache.org/jira/browse/FLINK-31374 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap StreamPartitioner to deal with records in iterations. However, it did not implement ConfigurableStreamPartitioner interface. Thus that maxParallelism information is lost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark
[ https://issues.apache.org/jira/browse/FLINK-31373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31373: -- Description: Currently we use PerRoundWrapperOperator to wrap the normal flink operators such that they can be used in iterations. We already contained the epoch information in each record so that we know which iteration each record belongs to. However, there is no epoch information when the stream element is a watermark. This works in most cases, but fail to address the following use case: - In DataStreamUtils#withBroadcast, we will cache the elements (including watermarks) from non-broadcast inputs until the broadcast variables are ready. When the broadcast variables are ready, once we receive a stream element we will process the cached elements first. If the received element is a watermark, the current implementation of iteration module fails (ProxyOutput#collect throws NPE) since there is no epoch information. was: Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators such that they can be used in iterations. We already contained the epoch information in each record so that we know which iteration each record belongs to. However, there is no epoch information when the stream element is a watermark. This works in most cases, but fail to address the following issue: - In DataStreamUtils#withBroadcast, we will cache the elements (including watermarks) from non-broadcast inputs until the broadcast variables are ready. When the broadcast variables are ready, once we receive a stream element we will process the cached elements first. If the received element is a watermark, the current implementation of iteration module fails (`ProxyOutput#collect` throws NPE) since there is no epoch information. > PerRoundWrapperOperator should carry epoch information in watermark > --- > > Key: FLINK-31373 > URL: https://issues.apache.org/jira/browse/FLINK-31373 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > Currently we use PerRoundWrapperOperator to wrap the normal flink operators > such that they can be used in iterations. > We already contained the epoch information in each record so that we know > which iteration each record belongs to. > However, there is no epoch information when the stream element is a > watermark. This works in most cases, but fail to address the following use > case: > - In DataStreamUtils#withBroadcast, we will cache the elements (including > watermarks) from non-broadcast inputs until the broadcast variables are > ready. When the broadcast variables are ready, once we receive a stream > element we will process the cached elements first. If the received element is > a watermark, the current implementation of iteration module fails > (ProxyOutput#collect throws NPE) since there is no epoch information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark
Zhipeng Zhang created FLINK-31373: - Summary: PerRoundWrapperOperator should carry epoch information in watermark Key: FLINK-31373 URL: https://issues.apache.org/jira/browse/FLINK-31373 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators such that they can be used in iterations. We already contained the epoch information in each record so that we know which iteration each record belongs to. However, there is no epoch information when the stream element is a watermark. This works in most cases, but fail to address the following issue: - In DataStreamUtils#withBroadcast, we will cache the elements (including watermarks) from non-broadcast inputs until the broadcast variables are ready. When the broadcast variables are ready, once we receive a stream element we will process the cached elements first. If the received element is a watermark, the current implementation of iteration module fails (`ProxyOutput#collect` throws NPE) since there is no epoch information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31191) VectorIndexer should check whether doublesByColumn is null before snapshot
[ https://issues.apache.org/jira/browse/FLINK-31191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31191: - Assignee: Zhipeng Zhang > VectorIndexer should check whether doublesByColumn is null before snapshot > -- > > Key: FLINK-31191 > URL: https://issues.apache.org/jira/browse/FLINK-31191 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > > Currently VectorIndexer would lead to NPE when doing checkpoint. It should > check whether `doublesByColumn` is null before calling snapshot. > > logview: > [https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039] > details: > > > [735|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:736]Caused > by: java.lang.NullPointerException > [736|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:737] > at > org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.convertToListArray(VectorIndexer.java:232) > > [737|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:738] > at > org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.snapshotState(VectorIndexer.java:228) > > [738|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:739] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) > > [739|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:740] > ... 33 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31276) VectorIndexerTest#testFitAndPredictWithHandleInvalid fails
Zhipeng Zhang created FLINK-31276: - Summary: VectorIndexerTest#testFitAndPredictWithHandleInvalid fails Key: FLINK-31276 URL: https://issues.apache.org/jira/browse/FLINK-31276 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang The unit test fails [1] and the error stack is: Error: Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 22.176 s <<< FAILURE! - in org.apache.flink.ml.feature.VectorIndexerTest [592|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:593]Error: testFitAndPredictWithHandleInvalid Time elapsed: 4.178 s <<< FAILURE! [593|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:594]java.lang.AssertionError: expected: but was: [594|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:595] at org.junit.Assert.fail(Assert.java:89) [595|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:596] at org.junit.Assert.failNotEquals(Assert.java:835) [596|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:597] at org.junit.Assert.assertEquals(Assert.java:120) [597|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:598] at org.junit.Assert.assertEquals(Assert.java:146) [598|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:599] at org.apache.flink.ml.feature.VectorIndexerTest.testFitAndPredictWithHandleInvalid(VectorIndexerTest.java:186) [1] [https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31276) VectorIndexerTest#testFitAndPredictWithHandleInvalid fails
[ https://issues.apache.org/jira/browse/FLINK-31276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694916#comment-17694916 ] Zhipeng Zhang commented on FLINK-31276: --- I repeated running it for 1000 times and did not reproduce the bug. > VectorIndexerTest#testFitAndPredictWithHandleInvalid fails > -- > > Key: FLINK-31276 > URL: https://issues.apache.org/jira/browse/FLINK-31276 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > The unit test fails [1] and the error stack is: > > Error: Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 22.176 s <<< FAILURE! - in org.apache.flink.ml.feature.VectorIndexerTest > [592|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:593]Error: > testFitAndPredictWithHandleInvalid Time elapsed: 4.178 s <<< FAILURE! > [593|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:594]java.lang.AssertionError: > expected: for more options.> but was: > [594|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:595] > at org.junit.Assert.fail(Assert.java:89) > [595|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:596] > at org.junit.Assert.failNotEquals(Assert.java:835) > [596|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:597] > at org.junit.Assert.assertEquals(Assert.java:120) > [597|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:598] > at org.junit.Assert.assertEquals(Assert.java:146) > [598|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:599] > at > org.apache.flink.ml.feature.VectorIndexerTest.testFitAndPredictWithHandleInvalid(VectorIndexerTest.java:186) > > [1] > [https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
[ https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694417#comment-17694417 ] Zhipeng Zhang edited comment on FLINK-31255 at 2/28/23 8:17 AM: It seems that the bug comes from not unwrapping all the streamconfig of the wrapped operator in OperatorUtils#createWrappedOperatorConfig. was (Author: JIRAUSER284184): It seems that the bug comes from not unwrapping the streamconfig of the wrapped operator in OperatorUtils#createWrappedOperatorConfig. > OperatorUtils#createWrappedOperatorConfig fails to wrap operator config > --- > > Key: FLINK-31255 > URL: https://issues.apache.org/jira/browse/FLINK-31255 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > Currently we use operator wrapper to enable using normal operators in > iterations. However, teh operatorConfig is not correctly unwrapped. For > example, the following code fails because of wrong type serializer. > > {code:java} > @Test > public void testIterationWithMapPartition() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream input = > env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), > Types.LONG); > DataStreamList result = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(input), > ReplayableDataStreamList.notReplay(input), > IterationConfig.newBuilder() > .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND) > .build(), > new IterationBodyWithMapPartition()); > List counts = > IteratorUtils.toList(result.get(0).executeAndCollect()); > System.out.println(counts.size()); > } > private static class IterationBodyWithMapPartition implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStream input = variableStreams.get(0); > DataStream mapPartitionResult = > DataStreamUtils.mapPartition( > input, > new MapPartitionFunction () { > @Override > public void mapPartition(Iterable iterable, > Collector collector) > throws Exception { > for (Long iter: iterable) { > collector.collect(iter); > } > } > }); > DataStream terminationCriteria = > mapPartitionResult.flatMap(new > TerminateOnMaxIter(2)).returns(Types.INT); > return new IterationBodyResult( > DataStreamList.of(mapPartitionResult), variableStreams, > terminationCriteria); > } > } {code} > The error stack is: > Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to > org.apache.flink.iteration.IterationRecord > at > org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34) > at > org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79) > at > org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107) > at > org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148) > at > org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at >
[jira] [Commented] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
[ https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694417#comment-17694417 ] Zhipeng Zhang commented on FLINK-31255: --- It seems that the bug comes from not unwrapping the streamconfig of the wrapped operator in OperatorUtils#createWrappedOperatorConfig. > OperatorUtils#createWrappedOperatorConfig fails to wrap operator config > --- > > Key: FLINK-31255 > URL: https://issues.apache.org/jira/browse/FLINK-31255 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > Currently we use operator wrapper to enable using normal operators in > iterations. However, teh operatorConfig is not correctly unwrapped. For > example, the following code fails because of wrong type serializer. > > {code:java} > @Test > public void testIterationWithMapPartition() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream input = > env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), > Types.LONG); > DataStreamList result = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(input), > ReplayableDataStreamList.notReplay(input), > IterationConfig.newBuilder() > .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND) > .build(), > new IterationBodyWithMapPartition()); > List counts = > IteratorUtils.toList(result.get(0).executeAndCollect()); > System.out.println(counts.size()); > } > private static class IterationBodyWithMapPartition implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStream input = variableStreams.get(0); > DataStream mapPartitionResult = > DataStreamUtils.mapPartition( > input, > new MapPartitionFunction () { > @Override > public void mapPartition(Iterable iterable, > Collector collector) > throws Exception { > for (Long iter: iterable) { > collector.collect(iter); > } > } > }); > DataStream terminationCriteria = > mapPartitionResult.flatMap(new > TerminateOnMaxIter(2)).returns(Types.INT); > return new IterationBodyResult( > DataStreamList.of(mapPartitionResult), variableStreams, > terminationCriteria); > } > } {code} > The error stack is: > Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to > org.apache.flink.iteration.IterationRecord > at > org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34) > at > org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79) > at > org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107) > at > org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148) > at > org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at
[jira] [Created] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
Zhipeng Zhang created FLINK-31255: - Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap operator config Key: FLINK-31255 URL: https://issues.apache.org/jira/browse/FLINK-31255 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0 Reporter: Zhipeng Zhang Currently we use operator wrapper to enable using normal operators in iterations. However, teh operatorConfig is not correctly unwrapped. For example, the following code fails because of wrong type serializer. {code:java} @Test public void testIterationWithMapPartition() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream input = env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), Types.LONG); DataStreamList result = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input), ReplayableDataStreamList.notReplay(input), IterationConfig.newBuilder() .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND) .build(), new IterationBodyWithMapPartition()); List counts = IteratorUtils.toList(result.get(0).executeAndCollect()); System.out.println(counts.size()); } private static class IterationBodyWithMapPartition implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream input = variableStreams.get(0); DataStream mapPartitionResult = DataStreamUtils.mapPartition( input, new MapPartitionFunction () { @Override public void mapPartition(Iterable iterable, Collector collector) throws Exception { for (Long iter: iterable) { collector.collect(iter); } } }); DataStream terminationCriteria = mapPartitionResult.flatMap(new TerminateOnMaxIter(2)).returns(Types.INT); return new IterationBodyResult( DataStreamList.of(mapPartitionResult), variableStreams, terminationCriteria); } } {code} The error stack is: Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.iteration.IterationRecord at org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34) at org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79) at org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107) at org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148) at org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445) at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31191) VectorIndexer should check whether doublesByColumn is null before snapshot
Zhipeng Zhang created FLINK-31191: - Summary: VectorIndexer should check whether doublesByColumn is null before snapshot Key: FLINK-31191 URL: https://issues.apache.org/jira/browse/FLINK-31191 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang Currently VectorIndexer would lead to NPE when doing checkpoint. It should check whether `doublesByColumn` is null before calling snapshot. logview: [https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039] details: [735|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:736]Caused by: java.lang.NullPointerException [736|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:737] at org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.convertToListArray(VectorIndexer.java:232) [737|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:738] at org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.snapshotState(VectorIndexer.java:228) [738|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:739] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) [739|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:740] ... 33 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31173) Fix several bugs in flink-ml-iteration module
[ https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31173: -- Description: In flink-ml-iteration, there are several bugs as follows: # TailOperator should have one input operator. We have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. # ReplayOperator should replay the records when it receives max epochwatermark. Currently ReplayOperator does not replay the records when it receives the max epochwatermark. However, it is inconsistent with the HeadOperator. As HeadOperator will always forward the record to downstreams. # ProxyOperatorStateBackend does not correctly initialize the state descriptor. was: In flink-ml-iteration, there are several bugs as follows: # TailOperator should have one input operator. We have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. # ReplayOperator should replay the records when it receives max epochwatermark. Currently ReplayOperator does not replay the records when it receives the max epochwatermark. However, it is inconsistent with the HeadOperator. As HeadOperator will always forward the record to downstreams. # ProxyOperatorStateBackend does not correctly initialize the state descriptor. > Fix several bugs in flink-ml-iteration module > - > > Key: FLINK-31173 > URL: https://issues.apache.org/jira/browse/FLINK-31173 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > In flink-ml-iteration, there are several bugs as follows: > # TailOperator should have one input operator. We have added a Tail operator > to increment the epoch watermark at each iteration. We have made an > assumption that each Tail operator have only one input and did not align the > epoch watermarks from different inputs. This assumption might not be true if > the input is an `union`. > # ReplayOperator should replay the records when it receives max > epochwatermark. Currently ReplayOperator does not replay the records when it > receives the max epochwatermark. However, it is inconsistent with the > HeadOperator. As HeadOperator will always forward the record to downstreams. > # ProxyOperatorStateBackend does not correctly initialize the state > descriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31173) Fix several bugs in flink-ml-iteration module
[ https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31173: - Assignee: Zhipeng Zhang > Fix several bugs in flink-ml-iteration module > - > > Key: FLINK-31173 > URL: https://issues.apache.org/jira/browse/FLINK-31173 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > > In flink-ml-iteration, there are several bugs as follows: > # TailOperator should have one input operator. We have added a Tail operator > to increment the epoch watermark at each iteration. We have made an > assumption that each Tail operator have only one input and did not align the > epoch watermarks from different inputs. This assumption might not be true if > the input is an `union`. > # ReplayOperator should replay the records when it receives max > epochwatermark. Currently ReplayOperator does not replay the records when it > receives the max epochwatermark. However, it is inconsistent with the > HeadOperator. As HeadOperator will always forward the record to downstreams. > # ProxyOperatorStateBackend does not correctly initialize the state > descriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31173) Fix several bugs in flink-ml-iteration module
[ https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-31173: -- Description: In flink-ml-iteration, there are several bugs as follows: # TailOperator should have one input operator. We have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. # ReplayOperator should replay the records when it receives max epochwatermark. Currently ReplayOperator does not replay the records when it receives the max epochwatermark. However, it is inconsistent with the HeadOperator. As HeadOperator will always forward the record to downstreams. # ProxyOperatorStateBackend does not correctly initialize the state descriptor. was: In flink-ml-iteration, we have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. I propose to add an explicit check to prevent from having multiple input for the TailOperator. If a the input does contain multiple inputs, then users can add a map operator after union. Summary: Fix several bugs in flink-ml-iteration module (was: TailOperator should only have one input) > Fix several bugs in flink-ml-iteration module > - > > Key: FLINK-31173 > URL: https://issues.apache.org/jira/browse/FLINK-31173 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > > In flink-ml-iteration, there are several bugs as follows: > > # TailOperator should have one input operator. We have added a Tail operator > to increment the epoch watermark at each iteration. We have made an > assumption that each Tail operator have only one input and did not align the > epoch watermarks from different inputs. This assumption might not be true if > the input is an `union`. > # ReplayOperator should replay the records when it receives max > epochwatermark. Currently ReplayOperator does not replay the records when it > receives the max epochwatermark. However, it is inconsistent with the > HeadOperator. As HeadOperator will always forward the record to downstreams. > # ProxyOperatorStateBackend does not correctly initialize the state > descriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31160) Support join/cogroup in BroadcastUtils.withBroadcastStream
[ https://issues.apache.org/jira/browse/FLINK-31160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31160: - Assignee: Zhipeng Zhang > Support join/cogroup in BroadcastUtils.withBroadcastStream > -- > > Key: FLINK-31160 > URL: https://issues.apache.org/jira/browse/FLINK-31160 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Currently BroadcastUtils#withBroadcastStream does not support cogroup/join, > since we restricted that users can only introduce one extra operator in the > specified lambda function. > > To support using join/cogroup in BroadcastUtils#withBroadcastStream, we would > like to relax the restriction such that users can introduce more than more > extra operator, but only the output operator can access the broadcast > variables. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31173) TailOperator should only have one input
Zhipeng Zhang created FLINK-31173: - Summary: TailOperator should only have one input Key: FLINK-31173 URL: https://issues.apache.org/jira/browse/FLINK-31173 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0 Reporter: Zhipeng Zhang In flink-ml-iteration, we have added a Tail operator to increment the epoch watermark at each iteration. We have made an assumption that each Tail operator have only one input and did not align the epoch watermarks from different inputs. This assumption might not be true if the input is an `union`. I propose to add an explicit check to prevent from having multiple input for the TailOperator. If a the input does contain multiple inputs, then users can add a map operator after union. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31160) Support join/cogroup in BroadcastUtils.withBroadcastStream
Zhipeng Zhang created FLINK-31160: - Summary: Support join/cogroup in BroadcastUtils.withBroadcastStream Key: FLINK-31160 URL: https://issues.apache.org/jira/browse/FLINK-31160 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0 Reporter: Zhipeng Zhang Fix For: ml-2.2.0 Currently BroadcastUtils#withBroadcastStream does not support cogroup/join, since we restricted that users can only introduce one extra operator in the specified lambda function. To support using join/cogroup in BroadcastUtils#withBroadcastStream, we would like to relax the restriction such that users can introduce more than more extra operator, but only the output operator can access the broadcast variables. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30688) Disable Kryo fallback for tests in flink-ml-lib
[ https://issues.apache.org/jira/browse/FLINK-30688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-30688. --- Resolution: Fixed Fixed on master via: 7e26b7e2904a9d742e0318035a19bf2687855bb0 19f0f31ca93f4967c99968b3609a11763bfc0afb 904aba25951b5df170b6979a8a98e9fbff00d539 > Disable Kryo fallback for tests in flink-ml-lib > --- > > Key: FLINK-30688 > URL: https://issues.apache.org/jira/browse/FLINK-30688 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Kryo serializer is typically inefficient and we expect all Flink ML > algorithms to work without using the Kryo serializer. It will be useful to > validate this by disabling Kryo fallback in all Flink ML tests. > This can be done by configuring StreamExecutionEnvironment using > env.getConfig().disableGenericTypes(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31026) KBinsDiscretizer gives wrong bin edges when all values are same.
[ https://issues.apache.org/jira/browse/FLINK-31026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-31026: - Assignee: Fan Hong > KBinsDiscretizer gives wrong bin edges when all values are same. > > > Key: FLINK-31026 > URL: https://issues.apache.org/jira/browse/FLINK-31026 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Fan Hong >Assignee: Fan Hong >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Current implements gives bin edges of \{Double.MIN_VALUE, Double.MAX_VALUE} > when all values are same. > However, this bin cannot cover negative values and 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31026) KBinsDiscretizer gives wrong bin edges when all values are same.
[ https://issues.apache.org/jira/browse/FLINK-31026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-31026. --- Fix Version/s: ml-2.2.0 Resolution: Fixed Fixed on master via 0af95f256d438a12946f9152c0a65ec916a61323 > KBinsDiscretizer gives wrong bin edges when all values are same. > > > Key: FLINK-31026 > URL: https://issues.apache.org/jira/browse/FLINK-31026 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Fan Hong >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Current implements gives bin edges of \{Double.MIN_VALUE, Double.MAX_VALUE} > when all values are same. > However, this bin cannot cover negative values and 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30451) Add AlgoOperator for Swing
[ https://issues.apache.org/jira/browse/FLINK-30451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-30451: -- Fix Version/s: ml-2.2.0 Affects Version/s: ml-2.2.0 > Add AlgoOperator for Swing > -- > > Key: FLINK-30451 > URL: https://issues.apache.org/jira/browse/FLINK-30451 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30451) Add AlgoOperator for Swing
[ https://issues.apache.org/jira/browse/FLINK-30451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-30451. --- Resolution: Fixed Fixed on master via e85fcf7635154871a49f6feb53553b0e98b21d88 > Add AlgoOperator for Swing > -- > > Key: FLINK-30451 > URL: https://issues.apache.org/jira/browse/FLINK-30451 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30451) Add AlgoOperator for Swing
[ https://issues.apache.org/jira/browse/FLINK-30451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-30451: -- Summary: Add AlgoOperator for Swing (was: Add Estimator and Transformer for Swing) > Add AlgoOperator for Swing > -- > > Key: FLINK-30451 > URL: https://issues.apache.org/jira/browse/FLINK-30451 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30730) StringIndexer cannot handle null values correctly
[ https://issues.apache.org/jira/browse/FLINK-30730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-30730. --- Fix Version/s: ml-2.2.0 Resolution: Fixed Fixed on master via 2b83d247cb5e81fb04b31399d436dbb9e809a473 > StringIndexer cannot handle null values correctly > - > > Key: FLINK-30730 > URL: https://issues.apache.org/jira/browse/FLINK-30730 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.1.0 >Reporter: Fan Hong >Assignee: Fan Hong >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > When training data contains null values, StringIndexer throws a exception. > The reason is this method [1]: null values are neither String type nor Number > type. > In StringIndexerModel, null values are also not handled correctly when > performing transformation. > > [1] > [https://github.com/apache/flink-ml/blob/966cedd7bbab4e12d8d8b37dbd582146714e68a6/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/StringIndexer.java#L164] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30730) StringIndexer cannot handle null values correctly
[ https://issues.apache.org/jira/browse/FLINK-30730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-30730: - Assignee: Fan Hong > StringIndexer cannot handle null values correctly > - > > Key: FLINK-30730 > URL: https://issues.apache.org/jira/browse/FLINK-30730 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.1.0 >Reporter: Fan Hong >Assignee: Fan Hong >Priority: Major > Labels: pull-request-available > > When training data contains null values, StringIndexer throws a exception. > The reason is this method [1]: null values are neither String type nor Number > type. > In StringIndexerModel, null values are also not handled correctly when > performing transformation. > > [1] > [https://github.com/apache/flink-ml/blob/966cedd7bbab4e12d8d8b37dbd582146714e68a6/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/StringIndexer.java#L164] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30933) Result of join inside iterationBody loses max watermark
[ https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-30933: -- Description: Currently if we execute a join inside an iteration body, the following program produces empty output. (In which the right result should be a list with \{2}. {code:java} public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream> input1 = env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); DataStream> input2 = env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); DataStream> iterationJoin = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input1), ReplayableDataStreamList.replay(input2), IterationConfig.newBuilder() .setOperatorLifeCycle( IterationConfig.OperatorLifeCycle.PER_ROUND) .build(), new MyIterationBody()) .get(0); DataStream left = iterationJoin.map(x -> x.f0); DataStream right = iterationJoin.map(x -> x.f1); DataStream result = left.join(right) .where(x -> x) .equalTo(x -> x) .window(EndOfStreamWindows.get()) .apply((JoinFunction) (l1, l2) -> l1); List collectedResult = IteratorUtils.toList(result.executeAndCollect()); List expectedResult = Arrays.asList(2L); compareResultCollections(expectedResult, collectedResult, Long::compareTo); } private static class MyIterationBody implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream> input1 = variableStreams.get(0); DataStream> input2 = dataStreams.get(0); DataStream terminationCriteria = input1.flatMap(new TerminateOnMaxIter(1)); DataStream> res = input1.join(input2) .where(x -> x.f0) .equalTo(x -> x.f0) .window(EndOfStreamWindows.get()) .apply( new JoinFunction< Tuple2, Tuple2, Tuple2>() { @Override public Tuple2 join( Tuple2 longIntegerTuple2, Tuple2 longLongTuple2) throws Exception { return longLongTuple2; } }); return new IterationBodyResult( DataStreamList.of(input1), DataStreamList.of(res), terminationCriteria); } } } {code} There are two possible reasons: * The timer in `HeadOperator` is not a daemon process and it does not exit even flink job finishes. * The max watermark from the iteration body is missed. was: Currently if we execute a join inside an iteration body, the following program produces empty output. (In which the right result should be a list with \{1, 2}. {code:java} public class Test { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); DataStream> input1 = env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); DataStream> input2 = env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); DataStream> iterationJoin = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input1), ReplayableDataStreamList.replay(input2), IterationConfig.newBuilder() .setOperatorLifeCycle( IterationConfig.OperatorLifeCycle.PER_ROUND) .build(), new MyIterationBody())
[jira] [Assigned] (FLINK-30933) Result of join inside iterationBody loses max watermark
[ https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-30933: - Assignee: Zhipeng Zhang > Result of join inside iterationBody loses max watermark > --- > > Key: FLINK-30933 > URL: https://issues.apache.org/jira/browse/FLINK-30933 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > Currently if we execute a join inside an iteration body, the following > program produces empty output. (In which the right result should be a list > with \{1, 2}. > {code:java} > public class Test { > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.setParallelism(1); > DataStream> input1 = > env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); > DataStream> input2 = > env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); > DataStream> iterationJoin = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(input1), > ReplayableDataStreamList.replay(input2), > IterationConfig.newBuilder() > .setOperatorLifeCycle( > > IterationConfig.OperatorLifeCycle.PER_ROUND) > .build(), > new MyIterationBody()) > .get(0); > DataStream left = iterationJoin.map(x -> x.f0); > DataStream right = iterationJoin.map(x -> x.f0); > DataStream result = > left.join(right) > .where(x -> x) > .equalTo(x -> x) > .window(EndOfStreamWindows.get()) > .apply((JoinFunction) (l1, l2) -> > l1); > List collectedResult = > IteratorUtils.toList(result.executeAndCollect()); > List expectedResult = Arrays.asList(1L, 2L); > compareResultCollections(expectedResult, collectedResult, > Long::compareTo); > } > private static class MyIterationBody implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStream> input1 = variableStreams.get(0); > DataStream> input2 = dataStreams.get(0); > DataStream terminationCriteria = input1.flatMap(new > TerminateOnMaxIter(1)); > DataStream> res = > input1.join(input2) > .where(x -> x.f0) > .equalTo(x -> x.f0) > .window(EndOfStreamWindows.get()) > .apply( > (JoinFunction< > Tuple2, > Tuple2, > Tuple2>) > (t1, t2) -> t2); > return new IterationBodyResult( > DataStreamList.of(input1), DataStreamList.of(res), > terminationCriteria); > } > } > } > {code} > > There are two possible reasons: > * The timer in `HeadOperator` is not a daemon process and it does not exit > even flink job finishes. > * The max watermark from the iteration body is missed. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30933) Result of join inside iterationBody loses max watermark
Zhipeng Zhang created FLINK-30933: - Summary: Result of join inside iterationBody loses max watermark Key: FLINK-30933 URL: https://issues.apache.org/jira/browse/FLINK-30933 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0 Reporter: Zhipeng Zhang Fix For: ml-2.2.0 Currently if we execute a join inside an iteration body, the following program produces empty output. (In which the right result should be a list with \{1, 2}. {code:java} public class Test { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); DataStream> input1 = env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); DataStream> input2 = env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); DataStream> iterationJoin = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input1), ReplayableDataStreamList.replay(input2), IterationConfig.newBuilder() .setOperatorLifeCycle( IterationConfig.OperatorLifeCycle.PER_ROUND) .build(), new MyIterationBody()) .get(0); DataStream left = iterationJoin.map(x -> x.f0); DataStream right = iterationJoin.map(x -> x.f0); DataStream result = left.join(right) .where(x -> x) .equalTo(x -> x) .window(EndOfStreamWindows.get()) .apply((JoinFunction) (l1, l2) -> l1); List collectedResult = IteratorUtils.toList(result.executeAndCollect()); List expectedResult = Arrays.asList(1L, 2L); compareResultCollections(expectedResult, collectedResult, Long::compareTo); } private static class MyIterationBody implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream> input1 = variableStreams.get(0); DataStream> input2 = dataStreams.get(0); DataStream terminationCriteria = input1.flatMap(new TerminateOnMaxIter(1)); DataStream> res = input1.join(input2) .where(x -> x.f0) .equalTo(x -> x.f0) .window(EndOfStreamWindows.get()) .apply( (JoinFunction< Tuple2, Tuple2, Tuple2>) (t1, t2) -> t2); return new IterationBodyResult( DataStreamList.of(input1), DataStreamList.of(res), terminationCriteria); } } } {code} There are two possible reasons: * The timer in `HeadOperator` is not a daemon process and it does not exit even flink job finishes. * The max watermark from the iteration body is missed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30671) Add AlgoOperator for ClusteringEvaluator
Zhipeng Zhang created FLINK-30671: - Summary: Add AlgoOperator for ClusteringEvaluator Key: FLINK-30671 URL: https://issues.apache.org/jira/browse/FLINK-30671 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30401) Add Estimator and Transformer for MinHashLSH
[ https://issues.apache.org/jira/browse/FLINK-30401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-30401. --- Resolution: Fixed > Add Estimator and Transformer for MinHashLSH > > > Key: FLINK-30401 > URL: https://issues.apache.org/jira/browse/FLINK-30401 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Fan Hong >Assignee: Fan Hong >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Add Estimator and Transformer for MinHashLSH. > Its function would be at least equivalent to Spark's > org.apache.spark.ml.feature.MinHashLSH. The relevant PR should contain the > following components: > * Java implementation/test (Must include) > * Python implementation/test (Optional) > * Markdown document (Optional) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30566) Add benchmark configurations for agglomerativeclustering, hashingtf, idf, kbinsdiscretizer, linearregression, linearsvc, logisticregression, ngram, regextokenizer, tok
[ https://issues.apache.org/jira/browse/FLINK-30566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-30566: - Assignee: Zhipeng Zhang > Add benchmark configurations for agglomerativeclustering, hashingtf, idf, > kbinsdiscretizer, linearregression, linearsvc, logisticregression, ngram, > regextokenizer, tokenizer and vectorindexer > > > Key: FLINK-30566 > URL: https://issues.apache.org/jira/browse/FLINK-30566 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Minor > Labels: pull-request-available > > Add benchmark configurations for the following algorithms: > * agglomerativeclustering > * hashingtf > * idf > * kbinsdiscretizer > * linearregression > * linearsvc > * logisticregression > * ngram > * regextokenizer > * tokenizer > * vectorindexer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30566) Add benchmark configurations for agglomerativeclustering, hashingtf, idf, kbinsdiscretizer, linearregression, linearsvc, logisticregression, ngram, regextokenizer, toke
Zhipeng Zhang created FLINK-30566: - Summary: Add benchmark configurations for agglomerativeclustering, hashingtf, idf, kbinsdiscretizer, linearregression, linearsvc, logisticregression, ngram, regextokenizer, tokenizer and vectorindexer Key: FLINK-30566 URL: https://issues.apache.org/jira/browse/FLINK-30566 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang Add benchmark configurations for the following algorithms: * agglomerativeclustering * hashingtf * idf * kbinsdiscretizer * linearregression * linearsvc * logisticregression * ngram * regextokenizer * tokenizer * vectorindexer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler
[ https://issues.apache.org/jira/browse/FLINK-30541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-30541: - Assignee: Zhipeng Zhang > Add Transformer and Estimator for OnlineStandardScaler > -- > > Key: FLINK-30541 > URL: https://issues.apache.org/jira/browse/FLINK-30541 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler
Zhipeng Zhang created FLINK-30541: - Summary: Add Transformer and Estimator for OnlineStandardScaler Key: FLINK-30541 URL: https://issues.apache.org/jira/browse/FLINK-30541 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30451) Add Estimator and Transformer for Swing
Zhipeng Zhang created FLINK-30451: - Summary: Add Estimator and Transformer for Swing Key: FLINK-30451 URL: https://issues.apache.org/jira/browse/FLINK-30451 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30249) TableUtils.getRowTypeInfo() creating wrong TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-30249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-30249: - Assignee: Zhipeng Zhang > TableUtils.getRowTypeInfo() creating wrong TypeInformation > -- > > Key: FLINK-30249 > URL: https://issues.apache.org/jira/browse/FLINK-30249 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0, ml-2.1.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30249) TableUtils.getRowTypeInfo() creating wrong TypeInformation
Zhipeng Zhang created FLINK-30249: - Summary: TableUtils.getRowTypeInfo() creating wrong TypeInformation Key: FLINK-30249 URL: https://issues.apache.org/jira/browse/FLINK-30249 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0 Reporter: Zhipeng Zhang Fix For: ml-2.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29434) Add AlgoOperator for Splitter
[ https://issues.apache.org/jira/browse/FLINK-29434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29434. --- Assignee: weibo zhao Resolution: Fixed > Add AlgoOperator for Splitter > - > > Key: FLINK-29434 > URL: https://issues.apache.org/jira/browse/FLINK-29434 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: weibo zhao >Assignee: weibo zhao >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Add AlgoOperator for Splitter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29434) Add AlgoOperator for Splitter
[ https://issues.apache.org/jira/browse/FLINK-29434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-29434: -- Fix Version/s: ml-2.2.0 > Add AlgoOperator for Splitter > - > > Key: FLINK-29434 > URL: https://issues.apache.org/jira/browse/FLINK-29434 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: weibo zhao >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Add AlgoOperator for Splitter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29323) Add inputSizes parameter for VectorAssembler
[ https://issues.apache.org/jira/browse/FLINK-29323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29323. --- Assignee: weibo zhao Resolution: Fixed > Add inputSizes parameter for VectorAssembler > > > Key: FLINK-29323 > URL: https://issues.apache.org/jira/browse/FLINK-29323 > Project: Flink > Issue Type: New Feature >Reporter: weibo zhao >Assignee: weibo zhao >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Refine Transformer for VectorAssembler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29323) Add inputSizes parameter for VectorAssembler
[ https://issues.apache.org/jira/browse/FLINK-29323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-29323: -- Summary: Add inputSizes parameter for VectorAssembler (was: Refine Transformer for VectorAssembler) > Add inputSizes parameter for VectorAssembler > > > Key: FLINK-29323 > URL: https://issues.apache.org/jira/browse/FLINK-29323 > Project: Flink > Issue Type: New Feature >Reporter: weibo zhao >Priority: Major > Labels: pull-request-available > > Refine Transformer for VectorAssembler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29323) Add inputSizes parameter for VectorAssembler
[ https://issues.apache.org/jira/browse/FLINK-29323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-29323: -- Fix Version/s: ml-2.2.0 > Add inputSizes parameter for VectorAssembler > > > Key: FLINK-29323 > URL: https://issues.apache.org/jira/browse/FLINK-29323 > Project: Flink > Issue Type: New Feature >Reporter: weibo zhao >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Refine Transformer for VectorAssembler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29911) Improve performance of AgglomerativeClustering
[ https://issues.apache.org/jira/browse/FLINK-29911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang updated FLINK-29911: -- Fix Version/s: ml-2.2.0 > Improve performance of AgglomerativeClustering > -- > > Key: FLINK-29911 > URL: https://issues.apache.org/jira/browse/FLINK-29911 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29911) Improve performance of AgglomerativeClustering
[ https://issues.apache.org/jira/browse/FLINK-29911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-29911: - Assignee: Zhipeng Zhang > Improve performance of AgglomerativeClustering > -- > > Key: FLINK-29911 > URL: https://issues.apache.org/jira/browse/FLINK-29911 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29911) Improve performance of AgglomerativeClustering
Zhipeng Zhang created FLINK-29911: - Summary: Improve performance of AgglomerativeClustering Key: FLINK-29911 URL: https://issues.apache.org/jira/browse/FLINK-29911 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Zhipeng Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29824) AgglomerativeClustering fails when the distanceThreshold is very large
[ https://issues.apache.org/jira/browse/FLINK-29824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29824. --- Resolution: Fixed > AgglomerativeClustering fails when the distanceThreshold is very large > -- > > Key: FLINK-29824 > URL: https://issues.apache.org/jira/browse/FLINK-29824 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > > The current implementation did not consider following case: > When distanceThreshold not null and set as a large value, all data points are > supposed to be assigned into a single cluster. > > In this case, we should stop training when the number of the active clusters > is one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29843) Euclidean Distance Measure generates NAN distance values
[ https://issues.apache.org/jira/browse/FLINK-29843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29843. --- Resolution: Fixed > Euclidean Distance Measure generates NAN distance values > > > Key: FLINK-29843 > URL: https://issues.apache.org/jira/browse/FLINK-29843 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.1.0 >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > Currently Flink ML's `EuclideanDistanceMeasure.distance(...)` method might > return a negative value as the distance between two vectors given the > calculation accuracy of java doubles. This bug should be fixed to guarantee > that the distance is a non-negative value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29843) Euclidean Distance Measure generates NAN distance values
[ https://issues.apache.org/jira/browse/FLINK-29843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-29843: - Assignee: Yunfeng Zhou > Euclidean Distance Measure generates NAN distance values > > > Key: FLINK-29843 > URL: https://issues.apache.org/jira/browse/FLINK-29843 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.1.0 >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > Currently Flink ML's `EuclideanDistanceMeasure.distance(...)` method might > return a negative value as the distance between two vectors given the > calculation accuracy of java doubles. This bug should be fixed to guarantee > that the distance is a non-negative value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29824) AgglomerativeClustering fails when the distanceThreshold is very large
[ https://issues.apache.org/jira/browse/FLINK-29824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reassigned FLINK-29824: - Assignee: Zhipeng Zhang > AgglomerativeClustering fails when the distanceThreshold is very large > -- > > Key: FLINK-29824 > URL: https://issues.apache.org/jira/browse/FLINK-29824 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > > The current implementation did not consider following case: > When distanceThreshold not null and set as a large value, all data points are > supposed to be assigned into a single cluster. > > In this case, we should stop training when the number of the active clusters > is one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29824) AgglomerativeClustering fails when the distanceThreshold is very large
Zhipeng Zhang created FLINK-29824: - Summary: AgglomerativeClustering fails when the distanceThreshold is very large Key: FLINK-29824 URL: https://issues.apache.org/jira/browse/FLINK-29824 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Zhipeng Zhang The current implementation did not consider following case: When distanceThreshold not null and set as a large value, all data points are supposed to be assigned into a single cluster. In this case, we should stop training when the number of the active clusters is one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290 ] Zhipeng Zhang edited comment on FLINK-29176 at 10/12/22 10:11 AM: -- Hi [~zxcoccer] , sorry for the late reply. This Jira is duplicate with https://issues.apache.org/jira/browse/FLINK-28571. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29604?filter=-1=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20%22Library%20%2F%20Machine%20Learning%22%20order%20by%20updated%20DESC was (Author: JIRAUSER284184): Hi [~zxcoccer] , sorry for the late reply. This Jira is duplicate with https://issues.apache.org/jira/browse/FLINK-28571. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0 > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28906) Add AlgoOperator for AgglomerativeClustering
[ https://issues.apache.org/jira/browse/FLINK-28906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-28906. --- Resolution: Fixed > Add AlgoOperator for AgglomerativeClustering > > > Key: FLINK-28906 > URL: https://issues.apache.org/jira/browse/FLINK-28906 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290 ] Zhipeng Zhang edited comment on FLINK-29176 at 10/12/22 9:07 AM: - Hi [~zxcoccer] , sorry for the late reply. This Jira is duplicate with https://issues.apache.org/jira/browse/FLINK-28571. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0 was (Author: JIRAUSER284184): Hi [~zxcoccer] , sorry for the late reply. This Jira is with https://issues.apache.org/jira/browse/FLINK-28571 and is resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0 > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176 ] Zhipeng Zhang deleted comment on FLINK-29176: --- was (Author: JIRAUSER284184): Duplicate with https://issues.apache.org/jira/browse/FLINK-28571 > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290 ] Zhipeng Zhang edited comment on FLINK-29176 at 10/12/22 9:06 AM: - Hi [~zxcoccer] , sorry for the late reply. This Jira is with https://issues.apache.org/jira/browse/FLINK-28571 and is resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0 was (Author: JIRAUSER284184): Hi [~zxcoccer] , sorry for the late reply. This Jira is resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0 > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang closed FLINK-29176. - Resolution: Duplicate Duplicate with https://issues.apache.org/jira/browse/FLINK-28571 > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang reopened FLINK-29176: --- > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29176. --- Resolution: Fixed > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29176) Add python support for ChiSqTest
[ https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290 ] Zhipeng Zhang commented on FLINK-29176: --- Hi [~zxcoccer] , sorry for the late reply. This Jira is resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master. For other possisble Flink ML JIRAs, please refer to https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0 > Add python support for ChiSqTest > > > Key: FLINK-29176 > URL: https://issues.apache.org/jira/browse/FLINK-29176 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Priority: Major > Fix For: ml-2.2.0 > > > We already implemented ChiSqTest in java. In this pr, we aim to add python > source/test/example for ChiSqTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29044) Add Transformer for DCT
[ https://issues.apache.org/jira/browse/FLINK-29044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29044. --- Resolution: Fixed > Add Transformer for DCT > --- > > Key: FLINK-29044 > URL: https://issues.apache.org/jira/browse/FLINK-29044 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29171) Add documents for MaxAbs, FeatureHasher, Interaction, VectorSlicer, ElementwiseProduct and Binarizer
[ https://issues.apache.org/jira/browse/FLINK-29171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29171. --- Resolution: Fixed > Add documents for MaxAbs, FeatureHasher, Interaction, VectorSlicer, > ElementwiseProduct and Binarizer > > > Key: FLINK-29171 > URL: https://issues.apache.org/jira/browse/FLINK-29171 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: weibo zhao >Assignee: weibo zhao >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Add docs of MaxAbs, FeatureHasher, Interaction, VectorSlicer, > ElementwiseProduct and Binarizer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29175) Add documents for KBinsDiscretizer, VectorIndexer, Tokenizer, RegexTokenizer
[ https://issues.apache.org/jira/browse/FLINK-29175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhipeng Zhang resolved FLINK-29175. --- Resolution: Fixed > Add documents for KBinsDiscretizer, VectorIndexer, Tokenizer, RegexTokenizer > > > Key: FLINK-29175 > URL: https://issues.apache.org/jira/browse/FLINK-29175 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)