[jira] [Resolved] (FLINK-32810) Improve managed memory usage in ListStateWithCache

2023-09-04 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-32810.
---
Fix Version/s: ml-2.4.0
 Assignee: Fan Hong
   Resolution: Fixed

> Improve managed memory usage in ListStateWithCache
> --
>
> Key: FLINK-32810
> URL: https://issues.apache.org/jira/browse/FLINK-32810
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Assignee: Fan Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> Right now, by default, an instance of `ListStateWithCache` uses up all the 
> managed memory allocated for`ManagedMemoryUseCase.OPERATOR`.
> It could bring some bugs in some situations, for example, when there exist 
> more than one `ListStateWithCache`s in a single operator, or there are other 
> places using managed memory of `ManagedMemoryUseCase.OPERATOR`.
>  
> An approach to resolve such cases is to let the developer be aware about the 
> usage of managed memory of `ManagedMemoryUseCase.OPERATOR`, instead of 
> implicitly use up all of it.  Therefore, I think it is better to add a 
> parameters `fraction` to specify how much memory is used in the 
> `ListStateWithCache`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32889) BinaryClassificationEvaluator gives wrong weighted AUC value

2023-08-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-32889.
---
Fix Version/s: ml-2.4.0
 Assignee: Fan Hong
   Resolution: Fixed

> BinaryClassificationEvaluator gives wrong weighted AUC value
> 
>
> Key: FLINK-32889
> URL: https://issues.apache.org/jira/browse/FLINK-32889
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Fan Hong
>Assignee: Fan Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> BinaryClassificationEvaluator gives wrong AUC value when a weight column 
> provided.
> Here is an case from the unit test. The (score, label, weight) of data are:
> {code:java}
> (0.9, 1.0,  0.8),
> (0.9, 1.0,  0.7),
> (0.9, 1.0,  0.5),
> (0.75, 0.0,  1.2),
> (0.6, 0.0,  1.3),
> (0.9, 1.0,  1.5),
> (0.9, 1.0,  1.4),
> (0.4, 0.0,  0.3),
> (0.3, 0.0,  0.5),
> (0.9, 1.0,  1.9),
> (0.2, 0.0,  1.2),
> (0.1, 1.0,  1.0)
> {code}
> PySpark and scikit-learn gives a AUC score of 0.87179, while Flink ML 
> implementation gives 0.891168.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32889) BinaryClassificationEvaluator gives wrong weighted AUC value

2023-08-24 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758393#comment-17758393
 ] 

Zhipeng Zhang commented on FLINK-32889:
---

Solved on master via 5619c3b8591b220e78a0a792c1f940e06149c8f0.

> BinaryClassificationEvaluator gives wrong weighted AUC value
> 
>
> Key: FLINK-32889
> URL: https://issues.apache.org/jira/browse/FLINK-32889
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Fan Hong
>Priority: Major
>  Labels: pull-request-available
>
> BinaryClassificationEvaluator gives wrong AUC value when a weight column 
> provided.
> Here is an case from the unit test. The (score, label, weight) of data are:
> {code:java}
> (0.9, 1.0,  0.8),
> (0.9, 1.0,  0.7),
> (0.9, 1.0,  0.5),
> (0.75, 0.0,  1.2),
> (0.6, 0.0,  1.3),
> (0.9, 1.0,  1.5),
> (0.9, 1.0,  1.4),
> (0.4, 0.0,  0.3),
> (0.3, 0.0,  0.5),
> (0.9, 1.0,  1.9),
> (0.2, 0.0,  1.2),
> (0.1, 1.0,  1.0)
> {code}
> PySpark and scikit-learn gives a AUC score of 0.87179, while Flink ML 
> implementation gives 0.891168.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32598) Spill data from feedback edge to disk to avoid possible OOM

2023-07-17 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-32598:
--
Description: 
In Flink ML, we use feedback edge to implement the iteration module. Suppose 
the job topology is like `OpA -> HeadOperator -> OpB -> TailOperator`, then the 
basic process of each iteration is as follows:
 * At the first iteration, HeadOperator takes the input from OpA and forward it 
to OpB.
 * Later, OpB consumes the input from HeadOperator and forward the output to 
TailOperator.
 * Finally, TailOperator puts the records into a memory message queue and 
HeadOperator consumes the message queue.

When the output from OpB contains many records and these records cannot be 
consumed soon, the message queue would grow big and finally lead to OOM.

> Spill data from feedback edge to disk to avoid possible OOM
> ---
>
> Key: FLINK-32598
> URL: https://issues.apache.org/jira/browse/FLINK-32598
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.4.0
>
>
> In Flink ML, we use feedback edge to implement the iteration module. Suppose 
> the job topology is like `OpA -> HeadOperator -> OpB -> TailOperator`, then 
> the basic process of each iteration is as follows:
>  * At the first iteration, HeadOperator takes the input from OpA and forward 
> it to OpB.
>  * Later, OpB consumes the input from HeadOperator and forward the output to 
> TailOperator.
>  * Finally, TailOperator puts the records into a memory message queue and 
> HeadOperator consumes the message queue.
> When the output from OpB contains many records and these records cannot be 
> consumed soon, the message queue would grow big and finally lead to OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32598) Spill data from feedback edge to disk to avoid possible OOM

2023-07-17 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32598:
-

 Summary: Spill data from feedback edge to disk to avoid possible 
OOM
 Key: FLINK-32598
 URL: https://issues.apache.org/jira/browse/FLINK-32598
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang
 Fix For: ml-2.4.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31910) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck

2023-06-29 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang closed FLINK-31910.
-
Resolution: Invalid

> Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
> ---
>
> Key: FLINK-31910
> URL: https://issues.apache.org/jira/browse/FLINK-31910
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. 
> >From the thread dump, it seems that the head operator and criteria node are 
> stuck and waiting for a mail.
>  
> {code:java}
> "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 
> os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition 
> [0x700013db6000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000747a83270> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown 
> Source)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748) {code}
>  
> The demo for this bug could be found here: 
> https://github.com/zhipeng93/flink-ml/tree/FLINK-31910-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32293) Support vector with long index

2023-06-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-32293:
--
Component/s: Library / Machine Learning

> Support vector with long index
> --
>
> Key: FLINK-32293
> URL: https://issues.apache.org/jira/browse/FLINK-32293
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently in Flink ML, we only support sparse and dense vector with `int` as 
> index and `double` as value.
>  
> However, there are real-world cases that the index of a vector could exceed 
> the range of `INT.MAX`. Thus we need to support vector with `long` index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32293) Support vector with long index

2023-06-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32293:
-

 Summary: Support vector with long index
 Key: FLINK-32293
 URL: https://issues.apache.org/jira/browse/FLINK-32293
 Project: Flink
  Issue Type: New Feature
Reporter: Zhipeng Zhang


Currently in Flink ML, we only support sparse and dense vector with `int` as 
index and `double` as value.

 

However, there are real-world cases that the index of a vector could exceed the 
range of `INT.MAX`. Thus we need to support vector with `long` index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32292:
-

 Summary: TableUtils.getRowTypeInfo fails to get type information 
of Tuple
 Key: FLINK-32292
 URL: https://issues.apache.org/jira/browse/FLINK-32292
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-05-26 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31901:
--
Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
time: 1682319149462
processed cached records, cnt: 1 at time: 1682319149569
processed cached records, cnt: 2 at time: 1682319149614
processed cached records, cnt: 3 at time: 1682319149655
processed cached records, cnt: 4 at time: 1682319149702
processed cached records, cnt: 5 at time: 1682319149746
processed cached records, cnt: 6 at time: 1682319149781
processed cached records, cnt: 7 at time: 1682319149891
processed cached records, cnt: 8 at time: 1682319150011
processed cached records, cnt: 9 at time: 1682319150116
processed cached records, cnt: 10 at time: 1682319150199
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
time: 1682319150378
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
time: 1682319150606
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
time: 1682319150704
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
time: 1682319150785
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
time: 1682319150859
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
time: 1682319150935
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
time: 1682319151007{code}
 

We can find that from line#2 to line#11, there is no checkpoints and the 
barriers are blocked until all cached elements are processed, which takes 
~600ms and much longer than checkpoint interval (i.e., 100ms)

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
time: 1682319149462
processed cached records, cnt: 1 at time: 1682319149569
processed cached records, cnt: 2 at time: 1682319149614
processed cached records, cnt: 3 at time: 1682319149655
processed cached records, cnt: 4 at time: 1682319149702
processed cached records, cnt: 5 at time: 1682319149746
processed cached records, cnt: 6 at time: 1682319149781
processed cached records, cnt: 7 at time: 1682319149891
processed cached records, cnt: 8 at time: 1682319150011
processed cached records, cnt: 9 at time: 1682319150116
processed cached records, cnt: 10 at time: 1682319150199
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
time: 1682319150378
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
time: 1682319150606
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
time: 1682319150704
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
time: 1682319150785
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
time: 1682319150859
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
time: 1682319150935
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
time: 1682319151007{code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed, which takes 
~600ms and much longer than checkpoint interval (i.e., 100ms)

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> 

[jira] [Closed] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream

2023-05-11 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang closed FLINK-31903.
-
Resolution: Not A Bug

> Caching records fails in BroadcastUtils#withBroadcastStream
> ---
>
> Key: FLINK-31903
> URL: https://issues.apache.org/jira/browse/FLINK-31903
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, 
> it throws exception as follows:
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> It seems that the bug comes from caching too many records when calling 
> AbstractBroadcastWrapperOperator#snapshot. 
>  
> The failed case could be found here: 
> [https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream

2023-05-11 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721753#comment-17721753
 ] 

Zhipeng Zhang commented on FLINK-31903:
---

The root cause is that using memory as statebackend has limited state size. So 
this is not a bug.

" java.io.IOException: Size of the state is larger than the maximum permitted 
memory-backed state. Size=5365814, maxSize=5242880. Consider using a different 
checkpoint storage, like the FileSystemCheckpointStorage.".

> Caching records fails in BroadcastUtils#withBroadcastStream
> ---
>
> Key: FLINK-31903
> URL: https://issues.apache.org/jira/browse/FLINK-31903
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, 
> it throws exception as follows:
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> It seems that the bug comes from caching too many records when calling 
> AbstractBroadcastWrapperOperator#snapshot. 
>  
> The failed case could be found here: 
> [https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31909) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-31909.
---
Resolution: Duplicate

Duplicate with https://issues.apache.org/jira/browse/FLINK-31910.

> Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
> ---
>
> Key: FLINK-31909
> URL: https://issues.apache.org/jira/browse/FLINK-31909
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.3.0
>
>
> Using BroadcastUtils#withBroadcastStream in iterations in per round mode 
> could possibly lead to stuck. 
>  
> It seems that the there is a task waiting for the mail from the mailbox.
>  
> {code:java}
>  793    "tail-map-head-Parallel Collection Source (1/1)#0" #200 prio=5 
> os_prio=31 tid=0x7faabb571800 nid=0x18c03 waiting on condition 
> [0x700013aae000]
>  793    java.lang.Thread.State: TIMED_WAITING (parking)
>  794     at sun.misc.Unsafe.park(Native Method)
>  795     - parking to wait for  <0x000747805568> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>  796     at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>  797     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
>  798     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
>  799     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
>  800     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  801     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  802     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  803     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  804     at 
> org.apache.flink.runtime.taskmanager.Task$$Lambda$1430/1226027100.run(Unknown 
> Source)
>  805     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  806     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>  807     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>  808     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>  809     at java.lang.Thread.run(Thread.java:748) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31910) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31910:
--
Description: 
Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. From 
the thread dump, it seems that the head operator and criteria node are stuck 
and waiting for a mail.

 
{code:java}
"output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 
os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition 
[0x700013db6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000747a83270> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown 
Source)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748) {code}
 

The demo for this bug could be found here: 
https://github.com/zhipeng93/flink-ml/tree/FLINK-31910-demo-case

  was:
Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. From 
the thread dump, it seems that the head operator and criteria node are stuck 
and waiting for a mail.

 
{code:java}
"output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 
os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition 
[0x700013db6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000747a83270> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown 
Source)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748) {code}
 

The demo for this bug could be found here: 


> Using BroadcastUtils#withBroadcast in iteration perround mode got stuck
> ---
>
> Key: FLINK-31910
> URL: https://issues.apache.org/jira/browse/FLINK-31910
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. 
> >From the thread dump, it seems that the head operator and criteria node are 
> stuck and waiting for a mail.
>  
> {code:java}
> "output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 
> os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on 

[jira] [Created] (FLINK-31910) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck

2023-04-24 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31910:
-

 Summary: Using BroadcastUtils#withBroadcast in iteration perround 
mode got stuck
 Key: FLINK-31910
 URL: https://issues.apache.org/jira/browse/FLINK-31910
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.3.0
Reporter: Zhipeng Zhang


Using BroadcastUtils#withBroadcast in iteration perround mode got stuck. From 
the thread dump, it seems that the head operator and criteria node are stuck 
and waiting for a mail.

 
{code:java}
"output-head-Parallel Collection Source -> Sink: Unnamed (4/4)#0" #228 prio=5 
os_prio=31 tid=0x7f9e1d083800 nid=0x19b07 waiting on condition 
[0x700013db6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000747a83270> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1383/280145505.run(Unknown 
Source)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748) {code}
 

The demo for this bug could be found here: 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31909) Using BroadcastUtils#withBroadcast in iteration perround mode got stuck

2023-04-24 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31909:
-

 Summary: Using BroadcastUtils#withBroadcast in iteration perround 
mode got stuck
 Key: FLINK-31909
 URL: https://issues.apache.org/jira/browse/FLINK-31909
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang
 Fix For: ml-2.3.0


Using BroadcastUtils#withBroadcastStream in iterations in per round mode could 
possibly lead to stuck. 

 

It seems that the there is a task waiting for the mail from the mailbox.

 
{code:java}
 793    "tail-map-head-Parallel Collection Source (1/1)#0" #200 prio=5 
os_prio=31 tid=0x7faabb571800 nid=0x18c03 waiting on condition 
[0x700013aae000]
 793    java.lang.Thread.State: TIMED_WAITING (parking)
 794     at sun.misc.Unsafe.park(Native Method)
 795     - parking to wait for  <0x000747805568> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 796     at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 797     at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
 798     at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
 799     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
 800     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
 801     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
 802     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 803     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
 804     at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1430/1226027100.run(Unknown 
Source)
 805     at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 806     at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
 807     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
 808     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
 809     at java.lang.Thread.run(Thread.java:748) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31903:
--
Description: 
When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it 
throws exception as follows:
{code:java}
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 {code}
It seems that the bug comes from caching too many records when calling 

AbstractBroadcastWrapperOperator#snapshot. 

 

The failed case could be found here: 
[https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case]

  was:
When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it 
leads to exception as follows:
{code:java}
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 {code}
It seems that the bug comes from caching too many records when calling 

AbstractBroadcastWrapperOperator#snapshot. 

 

The failed case could be found here: 
https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case


> Caching records fails in BroadcastUtils#withBroadcastStream
> ---
>
> Key: FLINK-31903
> URL: https://issues.apache.org/jira/browse/FLINK-31903
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, 
> it throws exception as follows:
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>     at 
> 

[jira] [Updated] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31903:
--
Description: 
When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it 
leads to exception as follows:
{code:java}
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 {code}
It seems that the bug comes from caching too many records when calling 

AbstractBroadcastWrapperOperator#snapshot. 

 

The failed case could be found here: 
https://github.com/zhipeng93/flink-ml/tree/FLINK-31903-fail-case

  was:
When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it 
leads to exception as follows:
{code:java}
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 {code}
It seems that the bug comes from caching too many records when calling 

AbstractBroadcastWrapperOperator#snapshot. 


> Caching records fails in BroadcastUtils#withBroadcastStream
> ---
>
> Key: FLINK-31903
> URL: https://issues.apache.org/jira/browse/FLINK-31903
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.3.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, 
> it leads to exception as follows:
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>     at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>     at 
> 

[jira] [Created] (FLINK-31903) Caching records fails in BroadcastUtils#withBroadcastStream

2023-04-24 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31903:
-

 Summary: Caching records fails in 
BroadcastUtils#withBroadcastStream
 Key: FLINK-31903
 URL: https://issues.apache.org/jira/browse/FLINK-31903
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.3.0
Reporter: Zhipeng Zhang


When caching more than 1,000,000 records using BroadcastUtils#withBroadcast, it 
leads to exception as follows:
{code:java}
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191)
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2078)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 {code}
It seems that the bug comes from caching too many records when calling 

AbstractBroadcastWrapperOperator#snapshot. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31901:
--
Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
time: 1682319149462
processed cached records, cnt: 1 at time: 1682319149569
processed cached records, cnt: 2 at time: 1682319149614
processed cached records, cnt: 3 at time: 1682319149655
processed cached records, cnt: 4 at time: 1682319149702
processed cached records, cnt: 5 at time: 1682319149746
processed cached records, cnt: 6 at time: 1682319149781
processed cached records, cnt: 7 at time: 1682319149891
processed cached records, cnt: 8 at time: 1682319150011
processed cached records, cnt: 9 at time: 1682319150116
processed cached records, cnt: 10 at time: 1682319150199
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
time: 1682319150378
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
time: 1682319150606
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
time: 1682319150704
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
time: 1682319150785
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
time: 1682319150859
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
time: 1682319150935
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
time: 1682319151007{code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed, which takes 
~600ms and much longer than checkpoint interval (i.e., 100ms)

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
processed cached records, cnt: 1
processed cached records, cnt: 2
processed cached records, cnt: 3
processed cached records, cnt: 4
processed cached records, cnt: 5
processed cached records, cnt: 6
processed cached records, cnt: 7
processed cached records, cnt: 8
processed cached records, cnt: 9
processed cached records, cnt: 10
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed.

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> Key: FLINK-31901
> URL: https://issues.apache.org/jira/browse/FLINK-31901
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.3.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived 

[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31901:
--
Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
processed cached records, cnt: 1
processed cached records, cnt: 2
processed cached records, cnt: 3
processed cached records, cnt: 4
processed cached records, cnt: 5
processed cached records, cnt: 6
processed cached records, cnt: 7
processed cached records, cnt: 8
processed cached records, cnt: 9
processed cached records, cnt: 10
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed.

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> Key: FLINK-31901
> URL: https://issues.apache.org/jira/browse/FLINK-31901
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.3.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived records.
>  
> Processing cached elements is invoked via `Input#processElement` and 
> `Input#processWatermark`.  However, processing cached element may take a long 
> time since there may be many cached records, which could potentially block 
> the checkpoint barrier.
>  
> If we run the code snippet here[1], we are supposed to get logs as follows.
> {code:java}
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
> processed cached records, cnt: 1
> processed cached records, cnt: 2
> processed cached records, cnt: 3
> processed cached records, cnt: 4
> processed cached records, cnt: 5
> processed cached records, cnt: 6
> processed cached records, cnt: 7
> processed cached records, cnt: 8
> processed cached records, cnt: 9
> processed cached records, cnt: 10
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
>  
> We can find that from line#3 to line#12, there is no checkpoints and the 
> barriers are blocked until all cached elements are processed.
>  
>   [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-04-24 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31901:
-

 Summary: AbstractBroadcastWrapperOperator should not block 
checkpoint barriers when processing cached records
 Key: FLINK-31901
 URL: https://issues.apache.org/jira/browse/FLINK-31901
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang
 Fix For: ml-2.3.0


Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31173) Fix several bugs in flink-ml-iteration module

2023-04-20 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31173:
--
Description: 
In flink-ml-iteration, there are several bugs as follows:
 # TailOperator should have one input operator. We have added a Tail operator 
to increment the epoch watermark at each iteration. We have made an assumption 
that each Tail operator have only one input and did not align the epoch 
watermarks from different inputs. This assumption might not be true if the 
input is an `union`.
 # ProxyOperatorStateBackend does not correctly initialize the state descriptor.

  was:
In flink-ml-iteration, there are several bugs as follows:
 # TailOperator should have one input operator. We have added a Tail operator 
to increment the epoch watermark at each iteration. We have made an assumption 
that each Tail operator have only one input and did not align the epoch 
watermarks from different inputs. This assumption might not be true if the 
input is an `union`.
 # ReplayOperator should replay the records when it receives max 
epochwatermark. Currently ReplayOperator does not replay the records when it 
receives the max epochwatermark. However, it is inconsistent with the 
HeadOperator. As HeadOperator will always forward the record to downstreams.  
 # ProxyOperatorStateBackend does not correctly initialize the state descriptor.


> Fix several bugs in flink-ml-iteration module
> -
>
> Key: FLINK-31173
> URL: https://issues.apache.org/jira/browse/FLINK-31173
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.3.0
>
>
> In flink-ml-iteration, there are several bugs as follows:
>  # TailOperator should have one input operator. We have added a Tail operator 
> to increment the epoch watermark at each iteration. We have made an 
> assumption that each Tail operator have only one input and did not align the 
> epoch watermarks from different inputs. This assumption might not be true if 
> the input is an `union`.
>  # ProxyOperatorStateBackend does not correctly initialize the state 
> descriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner

2023-04-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31374:
-

Assignee: Jiang Xin  (was: jiangxin li)

> ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
> -
>
> Key: FLINK-31374
> URL: https://issues.apache.org/jira/browse/FLINK-31374
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
>
> In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap 
> StreamPartitioner to deal with records in iterations.
>  
> However, it did not implement ConfigurableStreamPartitioner interface. Thus 
> that maxParallelism information is lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner

2023-04-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31374:
-

Assignee: jiangxin li

> ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
> -
>
> Key: FLINK-31374
> URL: https://issues.apache.org/jira/browse/FLINK-31374
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: jiangxin li
>Priority: Major
>  Labels: pull-request-available
>
> In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap 
> StreamPartitioner to deal with records in iterations.
>  
> However, it did not implement ConfigurableStreamPartitioner interface. Thus 
> that maxParallelism information is lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner

2023-04-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-31374.
---
Resolution: Fixed

Resolved on master via 990337bf5a0a23b08be85c475043a047703772c8

> ProxyStreamPartitioner should implement ConfigurableStreamPartitioner
> -
>
> Key: FLINK-31374
> URL: https://issues.apache.org/jira/browse/FLINK-31374
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap 
> StreamPartitioner to deal with records in iterations.
>  
> However, it did not implement ConfigurableStreamPartitioner interface. Thus 
> that maxParallelism information is lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31732) flink-ml-uber module should include statefun as a dependency

2023-04-05 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31732:
--
Component/s: Library / Machine Learning

> flink-ml-uber module should include statefun as a dependency
> 
>
> Key: FLINK-31732
> URL: https://issues.apache.org/jira/browse/FLINK-31732
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31732) flink-ml-uber module should include statefun as a dependency

2023-04-05 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang closed FLINK-31732.
-
Resolution: Won't Fix

We leave state-fun as a third party dependency for now.

> flink-ml-uber module should include statefun as a dependency
> 
>
> Key: FLINK-31732
> URL: https://issues.apache.org/jira/browse/FLINK-31732
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhipeng Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31732) flink-ml-uber module should include statefun as a dependency

2023-04-04 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31732:
-

 Summary: flink-ml-uber module should include statefun as a 
dependency
 Key: FLINK-31732
 URL: https://issues.apache.org/jira/browse/FLINK-31732
 Project: Flink
  Issue Type: Improvement
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31623) Fix DataStreamUtils#sample with approximate uniform sampling

2023-04-03 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31623:
--
Summary: Fix DataStreamUtils#sample with approximate uniform sampling  
(was: Change to uniform sampling in DataStreamUtils#sample method)

> Fix DataStreamUtils#sample with approximate uniform sampling
> 
>
> Key: FLINK-31623
> URL: https://issues.apache.org/jira/browse/FLINK-31623
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation employs two-level sampling method.
> However, when data instances are imbalanced distributed among partitions 
> (subtasks), the probabilities of instances to be sampled are different in 
> different partitions (subtasks), i.e., not a uniform sampling.
>  
> In addition, one side-effect of current implementation is: one subtask has a 
> memory footprint of `2 * numSamples * sizeof(element)`, which could cause 
> unexpected OOM in some situations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31623) Fix DataStreamUtils#sample with approximate uniform sampling

2023-04-03 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-31623.
---
Resolution: Fixed

Resolved on master via fe338b194b73fd51218f4d842fa7b0065fb76c56

> Fix DataStreamUtils#sample with approximate uniform sampling
> 
>
> Key: FLINK-31623
> URL: https://issues.apache.org/jira/browse/FLINK-31623
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation employs two-level sampling method.
> However, when data instances are imbalanced distributed among partitions 
> (subtasks), the probabilities of instances to be sampled are different in 
> different partitions (subtasks), i.e., not a uniform sampling.
>  
> In addition, one side-effect of current implementation is: one subtask has a 
> memory footprint of `2 * numSamples * sizeof(element)`, which could cause 
> unexpected OOM in some situations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31189) Allow special handle of less frequent values in StringIndexer

2023-04-03 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-31189.
---
Resolution: Fixed

Resolved via 560532d99c4330949d4da2c94d0e5228bdfbc9cd on master

> Allow special handle of less frequent values in StringIndexer
> -
>
> Key: FLINK-31189
> URL: https://issues.apache.org/jira/browse/FLINK-31189
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Real-world datasets often contain categorical features with millions of 
> distinct values, some of which may only appear a few times. To maximize the 
> performance of certain algorithms, it is important to treat these less 
> frequent values properly. A popular approach is to put them to a special 
> index, as is done in sklearn's OneHotEncoder [1].
>  
> [1] 
> https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31189) Allow special handle of less frequent values in StringIndexer

2023-03-31 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31189:
-

Assignee: Zhipeng Zhang

> Allow special handle of less frequent values in StringIndexer
> -
>
> Key: FLINK-31189
> URL: https://issues.apache.org/jira/browse/FLINK-31189
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Real-world datasets often contain categorical features with millions of 
> distinct values, some of which may only appear a few times. To maximize the 
> performance of certain algorithms, it is important to treat these less 
> frequent values properly. A popular approach is to put them to a special 
> index, as is done in sklearn's OneHotEncoder [1].
>  
> [1] 
> https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException

2023-03-16 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701452#comment-17701452
 ] 

Zhipeng Zhang commented on FLINK-31486:
---

Is this similar to this one [1]?

 

 [1] https://issues.apache.org/jira/browse/FLINK-31255

> Using KeySelector in IterationBody causes ClassNotFoundException
> 
>
> Key: FLINK-31486
> URL: https://issues.apache.org/jira/browse/FLINK-31486
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> When we use CoGroup along with KeySelector in an IterationBody, the following 
> exception occurs.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Could not instantiate state partitioner. at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
>  at 
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
>  at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector 
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
>  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) 
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) 
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>  at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
>  ... 17 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31410) ListStateWithCache Should support incremental snapshot

2023-03-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31410:
--
Description: 
In Flink ML, we use ListStateWithCache [2] to enable caching data in memory and 
filesystem. However, it does not support incremental snapshot now — It writes 
all the data to checkpoint stream whenever calling snapshot [1], which could be 
inefficient.

 

 
[1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116]

 
[2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java]
 

  was:
In Flink ML, we used ListStateWithCache [2] to enable caching data in memory 
and filesystem. However, it does not support incremental snapshot now — It 
writes all the data to checkpoint stream when calling snapshot [1], which could 
be inefficient.

 

 
[1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116]

 
[2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java]
 


> ListStateWithCache Should support incremental snapshot
> --
>
> Key: FLINK-31410
> URL: https://issues.apache.org/jira/browse/FLINK-31410
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> In Flink ML, we use ListStateWithCache [2] to enable caching data in memory 
> and filesystem. However, it does not support incremental snapshot now — It 
> writes all the data to checkpoint stream whenever calling snapshot [1], which 
> could be inefficient.
>  
>  
> [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116]
>  
> [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31410) ListStateWithCache Should support incremental snapshot

2023-03-12 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31410:
-

 Summary: ListStateWithCache Should support incremental snapshot
 Key: FLINK-31410
 URL: https://issues.apache.org/jira/browse/FLINK-31410
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


In Flink ML, we used ListStateWithCache [2] to enable caching data in memory 
and filesystem. However, it does not support incremental snapshot now — It 
writes all the data to checkpoint stream when calling snapshot [1], which could 
be inefficient.

 

 
[1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116]

 
[2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31410) ListStateWithCache Should support incremental snapshot

2023-03-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31410:
--
Issue Type: Improvement  (was: Bug)

> ListStateWithCache Should support incremental snapshot
> --
>
> Key: FLINK-31410
> URL: https://issues.apache.org/jira/browse/FLINK-31410
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> In Flink ML, we used ListStateWithCache [2] to enable caching data in memory 
> and filesystem. However, it does not support incremental snapshot now — It 
> writes all the data to checkpoint stream when calling snapshot [1], which 
> could be inefficient.
>  
>  
> [1][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java#L116]
>  
> [2][https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark

2023-03-09 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698766#comment-17698766
 ] 

Zhipeng Zhang commented on FLINK-31373:
---

As discussed with [~gaoyunhaii] offline, we agree that the watermark is not 
correctly processed in iteration module.

 

To avoid the above cases for now, we plan to add a java doc to explain that 
`flink-m-iteration` module cannot deal with watermarks correctly. We will leave 
it as a TODO here.

> PerRoundWrapperOperator should carry epoch information in watermark
> ---
>
> Key: FLINK-31373
> URL: https://issues.apache.org/jira/browse/FLINK-31373
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
> such that they can be used in iterations.
> We already contained the epoch information in each record so that we know 
> which iteration each record belongs to.
> However, there is no epoch information when the stream element is a 
> watermark. This works in most cases, but fail to address the following use 
> case:
>  - In DataStreamUtils#withBroadcast, we will cache the elements (including 
> watermarks) from non-broadcast inputs until the broadcast variables are 
> ready. When the broadcast variables are ready, once we receive a stream 
> element we will process the cached elements first. If the received element is 
> a watermark, the current implementation of iteration module fails 
> (ProxyOutput#collect throws NPE) since there is no epoch  information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31029) KBinsDiscretizer gives wrong bin edges in 'quantile' strategy when input data contains only 2 distinct values

2023-03-09 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31029:
-

Assignee: Zhipeng Zhang

> KBinsDiscretizer gives wrong bin edges in 'quantile' strategy when input data 
> contains only 2 distinct values
> -
>
> Key: FLINK-31029
> URL: https://issues.apache.org/jira/browse/FLINK-31029
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When one input column contains only 2 distinct values and their counts are 
> same, KBinsDiscretizer transforms this column to all 0s using `quantile` 
> strategy. An example of such column is `[0, 0, 0, 1, 1, 1]`.
> When the 2 distinct values have different counts, the transformed values are 
> also all 0s, which cannot distinguish them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner

2023-03-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31374:
-

 Summary: ProxyStreamPartitioner should implement 
ConfigurableStreamPartitioner
 Key: FLINK-31374
 URL: https://issues.apache.org/jira/browse/FLINK-31374
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap 
StreamPartitioner to deal with records in iterations.

 

However, it did not implement ConfigurableStreamPartitioner interface. Thus 
that maxParallelism information is lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark

2023-03-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31373:
--
Description: 
Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
such that they can be used in iterations.

We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following use case:
 - In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (ProxyOutput#collect 
throws NPE) since there is no epoch  information.

  was:
Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators 
such that they can be used in iterations.


We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following issue:

- In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (`ProxyOutput#collect` 
throws NPE) since there is no epoch  information.


> PerRoundWrapperOperator should carry epoch information in watermark
> ---
>
> Key: FLINK-31373
> URL: https://issues.apache.org/jira/browse/FLINK-31373
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
> such that they can be used in iterations.
> We already contained the epoch information in each record so that we know 
> which iteration each record belongs to.
> However, there is no epoch information when the stream element is a 
> watermark. This works in most cases, but fail to address the following use 
> case:
>  - In DataStreamUtils#withBroadcast, we will cache the elements (including 
> watermarks) from non-broadcast inputs until the broadcast variables are 
> ready. When the broadcast variables are ready, once we receive a stream 
> element we will process the cached elements first. If the received element is 
> a watermark, the current implementation of iteration module fails 
> (ProxyOutput#collect throws NPE) since there is no epoch  information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark

2023-03-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31373:
-

 Summary: PerRoundWrapperOperator should carry epoch information in 
watermark
 Key: FLINK-31373
 URL: https://issues.apache.org/jira/browse/FLINK-31373
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators 
such that they can be used in iterations.


We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following issue:

- In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (`ProxyOutput#collect` 
throws NPE) since there is no epoch  information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31191) VectorIndexer should check whether doublesByColumn is null before snapshot

2023-03-01 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31191:
-

Assignee: Zhipeng Zhang

> VectorIndexer should check whether doublesByColumn is null before snapshot
> --
>
> Key: FLINK-31191
> URL: https://issues.apache.org/jira/browse/FLINK-31191
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Currently VectorIndexer would lead to NPE when doing checkpoint. It should 
> check whether `doublesByColumn` is null before calling snapshot.
>  
> logview: 
> [https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039]
> details:
>  
>  
> [735|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:736]Caused
>  by: java.lang.NullPointerException 
> [736|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:737]
>  at 
> org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.convertToListArray(VectorIndexer.java:232)
>  
> [737|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:738]
>  at 
> org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.snapshotState(VectorIndexer.java:228)
>  
> [738|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:739]
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
>  
> [739|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:740]
>  ... 33 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31276) VectorIndexerTest#testFitAndPredictWithHandleInvalid fails

2023-03-01 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31276:
-

 Summary: VectorIndexerTest#testFitAndPredictWithHandleInvalid fails
 Key: FLINK-31276
 URL: https://issues.apache.org/jira/browse/FLINK-31276
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


The unit test fails [1] and the error stack is:

 
Error:  Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 22.176 
s <<< FAILURE! - in org.apache.flink.ml.feature.VectorIndexerTest 
[592|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:593]Error:
  testFitAndPredictWithHandleInvalid Time elapsed: 4.178 s <<< FAILURE! 
[593|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:594]java.lang.AssertionError:
 expected: but was: 
[594|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:595]
 at org.junit.Assert.fail(Assert.java:89) 
[595|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:596]
 at org.junit.Assert.failNotEquals(Assert.java:835) 
[596|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:597]
 at org.junit.Assert.assertEquals(Assert.java:120) 
[597|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:598]
 at org.junit.Assert.assertEquals(Assert.java:146) 
[598|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:599]
 at 
org.apache.flink.ml.feature.VectorIndexerTest.testFitAndPredictWithHandleInvalid(VectorIndexerTest.java:186)
 
 [1] 
[https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31276) VectorIndexerTest#testFitAndPredictWithHandleInvalid fails

2023-03-01 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694916#comment-17694916
 ] 

Zhipeng Zhang commented on FLINK-31276:
---

I repeated running it for 1000 times and did not reproduce the bug.

> VectorIndexerTest#testFitAndPredictWithHandleInvalid fails
> --
>
> Key: FLINK-31276
> URL: https://issues.apache.org/jira/browse/FLINK-31276
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> The unit test fails [1] and the error stack is:
>  
> Error:  Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 22.176 s <<< FAILURE! - in org.apache.flink.ml.feature.VectorIndexerTest 
> [592|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:593]Error:
>   testFitAndPredictWithHandleInvalid Time elapsed: 4.178 s <<< FAILURE! 
> [593|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:594]java.lang.AssertionError:
>  expected: for more options.> but was: 
> [594|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:595]
>  at org.junit.Assert.fail(Assert.java:89) 
> [595|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:596]
>  at org.junit.Assert.failNotEquals(Assert.java:835) 
> [596|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:597]
>  at org.junit.Assert.assertEquals(Assert.java:120) 
> [597|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:598]
>  at org.junit.Assert.assertEquals(Assert.java:146) 
> [598|https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426#step:4:599]
>  at 
> org.apache.flink.ml.feature.VectorIndexerTest.testFitAndPredictWithHandleInvalid(VectorIndexerTest.java:186)
>  
>  [1] 
> [https://github.com/apache/flink-ml/actions/runs/4300867923/jobs/7497476426]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config

2023-02-28 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694417#comment-17694417
 ] 

Zhipeng Zhang edited comment on FLINK-31255 at 2/28/23 8:17 AM:


It seems that the bug comes from not unwrapping all the streamconfig of the 
wrapped operator in OperatorUtils#createWrappedOperatorConfig.


was (Author: JIRAUSER284184):
It seems that the bug comes from not unwrapping the streamconfig of the wrapped 
operator in OperatorUtils#createWrappedOperatorConfig.

> OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
> ---
>
> Key: FLINK-31255
> URL: https://issues.apache.org/jira/browse/FLINK-31255
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently we use operator wrapper to enable using normal operators in 
> iterations. However, teh operatorConfig is not correctly unwrapped. For 
> example, the following code fails because of wrong type serializer.
>  
> {code:java}
> @Test
> public void testIterationWithMapPartition() throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream input =
> env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), 
> Types.LONG);
> DataStreamList result =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(input),
> ReplayableDataStreamList.notReplay(input),
> IterationConfig.newBuilder()
> .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
> .build(),
> new IterationBodyWithMapPartition());
> List counts = 
> IteratorUtils.toList(result.get(0).executeAndCollect());
> System.out.println(counts.size());
> }
> private static class IterationBodyWithMapPartition implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStream input = variableStreams.get(0);
> DataStream mapPartitionResult =
> DataStreamUtils.mapPartition(
> input,
> new MapPartitionFunction () {
> @Override
> public void mapPartition(Iterable  iterable, 
> Collector  collector)
> throws Exception {
> for (Long iter: iterable) {
> collector.collect(iter);
> }
> }
> });
> DataStream terminationCriteria =
> mapPartitionResult.flatMap(new 
> TerminateOnMaxIter(2)).returns(Types.INT);
> return new IterationBodyResult(
> DataStreamList.of(mapPartitionResult), variableStreams, 
> terminationCriteria);
> }
> } {code}
> The error stack is:
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
> org.apache.flink.iteration.IterationRecord
>     at 
> org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
>     at 
> org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
>     at 
> org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
>     at 
> org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
>     at 
> org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
>     at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> 

[jira] [Commented] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config

2023-02-28 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694417#comment-17694417
 ] 

Zhipeng Zhang commented on FLINK-31255:
---

It seems that the bug comes from not unwrapping the streamconfig of the wrapped 
operator in OperatorUtils#createWrappedOperatorConfig.

> OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
> ---
>
> Key: FLINK-31255
> URL: https://issues.apache.org/jira/browse/FLINK-31255
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently we use operator wrapper to enable using normal operators in 
> iterations. However, teh operatorConfig is not correctly unwrapped. For 
> example, the following code fails because of wrong type serializer.
>  
> {code:java}
> @Test
> public void testIterationWithMapPartition() throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream input =
> env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), 
> Types.LONG);
> DataStreamList result =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(input),
> ReplayableDataStreamList.notReplay(input),
> IterationConfig.newBuilder()
> .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
> .build(),
> new IterationBodyWithMapPartition());
> List counts = 
> IteratorUtils.toList(result.get(0).executeAndCollect());
> System.out.println(counts.size());
> }
> private static class IterationBodyWithMapPartition implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStream input = variableStreams.get(0);
> DataStream mapPartitionResult =
> DataStreamUtils.mapPartition(
> input,
> new MapPartitionFunction () {
> @Override
> public void mapPartition(Iterable  iterable, 
> Collector  collector)
> throws Exception {
> for (Long iter: iterable) {
> collector.collect(iter);
> }
> }
> });
> DataStream terminationCriteria =
> mapPartitionResult.flatMap(new 
> TerminateOnMaxIter(2)).returns(Types.INT);
> return new IterationBodyResult(
> DataStreamList.of(mapPartitionResult), variableStreams, 
> terminationCriteria);
> }
> } {code}
> The error stack is:
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
> org.apache.flink.iteration.IterationRecord
>     at 
> org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
>     at 
> org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
>     at 
> org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
>     at 
> org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
>     at 
> org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
>     at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at 

[jira] [Created] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config

2023-02-28 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31255:
-

 Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap 
operator config
 Key: FLINK-31255
 URL: https://issues.apache.org/jira/browse/FLINK-31255
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang


Currently we use operator wrapper to enable using normal operators in 
iterations. However, teh operatorConfig is not correctly unwrapped. For 
example, the following code fails because of wrong type serializer.

 
{code:java}
@Test
public void testIterationWithMapPartition() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream input =
env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), 
Types.LONG);
DataStreamList result =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input),
ReplayableDataStreamList.notReplay(input),
IterationConfig.newBuilder()
.setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
.build(),
new IterationBodyWithMapPartition());

List counts = 
IteratorUtils.toList(result.get(0).executeAndCollect());
System.out.println(counts.size());
}

private static class IterationBodyWithMapPartition implements IterationBody {

@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream input = variableStreams.get(0);

DataStream mapPartitionResult =
DataStreamUtils.mapPartition(
input,
new MapPartitionFunction () {
@Override
public void mapPartition(Iterable  iterable, 
Collector  collector)
throws Exception {
for (Long iter: iterable) {
collector.collect(iter);
}
}
});

DataStream terminationCriteria =
mapPartitionResult.flatMap(new 
TerminateOnMaxIter(2)).returns(Types.INT);

return new IterationBodyResult(
DataStreamList.of(mapPartitionResult), variableStreams, 
terminationCriteria);
}
} {code}
The error stack is:

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
org.apache.flink.iteration.IterationRecord
    at 
org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
    at 
org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
    at 
org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
    at 
org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
    at 
org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
    at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31191) VectorIndexer should check whether doublesByColumn is null before snapshot

2023-02-22 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31191:
-

 Summary: VectorIndexer should check whether doublesByColumn is 
null before snapshot
 Key: FLINK-31191
 URL: https://issues.apache.org/jira/browse/FLINK-31191
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


Currently VectorIndexer would lead to NPE when doing checkpoint. It should 
check whether `doublesByColumn` is null before calling snapshot.

 

logview: 
[https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039]

details:
 
 
[735|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:736]Caused
 by: java.lang.NullPointerException 
[736|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:737]
 at 
org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.convertToListArray(VectorIndexer.java:232)
 
[737|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:738]
 at 
org.apache.flink.ml.feature.vectorindexer.VectorIndexer$ComputeDistinctDoublesOperator.snapshotState(VectorIndexer.java:228)
 
[738|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:739]
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
 
[739|https://github.com/apache/flink-ml/actions/runs/4249415318/jobs/7389547039#step:4:740]
 ... 33 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31173) Fix several bugs in flink-ml-iteration module

2023-02-22 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31173:
--
Description: 
In flink-ml-iteration, there are several bugs as follows:
 # TailOperator should have one input operator. We have added a Tail operator 
to increment the epoch watermark at each iteration. We have made an assumption 
that each Tail operator have only one input and did not align the epoch 
watermarks from different inputs. This assumption might not be true if the 
input is an `union`.
 # ReplayOperator should replay the records when it receives max 
epochwatermark. Currently ReplayOperator does not replay the records when it 
receives the max epochwatermark. However, it is inconsistent with the 
HeadOperator. As HeadOperator will always forward the record to downstreams.  
 # ProxyOperatorStateBackend does not correctly initialize the state descriptor.

  was:
In flink-ml-iteration, there are several bugs as follows:

 
 # TailOperator should have one input operator. We have added a Tail operator 
to increment the epoch watermark at each iteration. We have made an assumption 
that each Tail operator have only one input and did not align the epoch 
watermarks from different inputs. This assumption might not be true if the 
input is an `union`.
 # ReplayOperator should replay the records when it receives max 
epochwatermark. Currently ReplayOperator does not replay the records when it 
receives the max epochwatermark. However, it is inconsistent with the 
HeadOperator. As HeadOperator will always forward the record to downstreams.  
 # ProxyOperatorStateBackend does not correctly initialize the state descriptor.


> Fix several bugs in flink-ml-iteration module
> -
>
> Key: FLINK-31173
> URL: https://issues.apache.org/jira/browse/FLINK-31173
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> In flink-ml-iteration, there are several bugs as follows:
>  # TailOperator should have one input operator. We have added a Tail operator 
> to increment the epoch watermark at each iteration. We have made an 
> assumption that each Tail operator have only one input and did not align the 
> epoch watermarks from different inputs. This assumption might not be true if 
> the input is an `union`.
>  # ReplayOperator should replay the records when it receives max 
> epochwatermark. Currently ReplayOperator does not replay the records when it 
> receives the max epochwatermark. However, it is inconsistent with the 
> HeadOperator. As HeadOperator will always forward the record to downstreams.  
>  # ProxyOperatorStateBackend does not correctly initialize the state 
> descriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31173) Fix several bugs in flink-ml-iteration module

2023-02-22 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31173:
-

Assignee: Zhipeng Zhang

> Fix several bugs in flink-ml-iteration module
> -
>
> Key: FLINK-31173
> URL: https://issues.apache.org/jira/browse/FLINK-31173
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>
> In flink-ml-iteration, there are several bugs as follows:
>  # TailOperator should have one input operator. We have added a Tail operator 
> to increment the epoch watermark at each iteration. We have made an 
> assumption that each Tail operator have only one input and did not align the 
> epoch watermarks from different inputs. This assumption might not be true if 
> the input is an `union`.
>  # ReplayOperator should replay the records when it receives max 
> epochwatermark. Currently ReplayOperator does not replay the records when it 
> receives the max epochwatermark. However, it is inconsistent with the 
> HeadOperator. As HeadOperator will always forward the record to downstreams.  
>  # ProxyOperatorStateBackend does not correctly initialize the state 
> descriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31173) Fix several bugs in flink-ml-iteration module

2023-02-22 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31173:
--
Description: 
In flink-ml-iteration, there are several bugs as follows:

 
 # TailOperator should have one input operator. We have added a Tail operator 
to increment the epoch watermark at each iteration. We have made an assumption 
that each Tail operator have only one input and did not align the epoch 
watermarks from different inputs. This assumption might not be true if the 
input is an `union`.
 # ReplayOperator should replay the records when it receives max 
epochwatermark. Currently ReplayOperator does not replay the records when it 
receives the max epochwatermark. However, it is inconsistent with the 
HeadOperator. As HeadOperator will always forward the record to downstreams.  
 # ProxyOperatorStateBackend does not correctly initialize the state descriptor.

  was:
In flink-ml-iteration, we have added a Tail operator to increment the epoch 
watermark at each iteration. We have made an assumption that each Tail operator 
have only one input and did not align the epoch watermarks from different 
inputs. This assumption might not be true if the input is an `union`.

 

I propose to add an explicit check to prevent from having multiple input for 
the TailOperator. If a the input does contain multiple inputs, then users can 
add a map operator after union.

Summary: Fix several bugs in flink-ml-iteration module  (was: 
TailOperator should only have one input)

> Fix several bugs in flink-ml-iteration module
> -
>
> Key: FLINK-31173
> URL: https://issues.apache.org/jira/browse/FLINK-31173
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> In flink-ml-iteration, there are several bugs as follows:
>  
>  # TailOperator should have one input operator. We have added a Tail operator 
> to increment the epoch watermark at each iteration. We have made an 
> assumption that each Tail operator have only one input and did not align the 
> epoch watermarks from different inputs. This assumption might not be true if 
> the input is an `union`.
>  # ReplayOperator should replay the records when it receives max 
> epochwatermark. Currently ReplayOperator does not replay the records when it 
> receives the max epochwatermark. However, it is inconsistent with the 
> HeadOperator. As HeadOperator will always forward the record to downstreams.  
>  # ProxyOperatorStateBackend does not correctly initialize the state 
> descriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31160) Support join/cogroup in BroadcastUtils.withBroadcastStream

2023-02-21 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31160:
-

Assignee: Zhipeng Zhang

> Support join/cogroup in BroadcastUtils.withBroadcastStream
> --
>
> Key: FLINK-31160
> URL: https://issues.apache.org/jira/browse/FLINK-31160
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Currently BroadcastUtils#withBroadcastStream does not support cogroup/join, 
> since we restricted that users can only introduce one extra operator in the 
> specified lambda function.
>  
> To support using join/cogroup in BroadcastUtils#withBroadcastStream, we would 
> like to relax the restriction such that users can introduce more than more 
> extra operator, but only the output operator can access the broadcast 
> variables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31173) TailOperator should only have one input

2023-02-21 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31173:
-

 Summary: TailOperator should only have one input
 Key: FLINK-31173
 URL: https://issues.apache.org/jira/browse/FLINK-31173
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang


In flink-ml-iteration, we have added a Tail operator to increment the epoch 
watermark at each iteration. We have made an assumption that each Tail operator 
have only one input and did not align the epoch watermarks from different 
inputs. This assumption might not be true if the input is an `union`.

 

I propose to add an explicit check to prevent from having multiple input for 
the TailOperator. If a the input does contain multiple inputs, then users can 
add a map operator after union.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31160) Support join/cogroup in BroadcastUtils.withBroadcastStream

2023-02-20 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31160:
-

 Summary: Support join/cogroup in BroadcastUtils.withBroadcastStream
 Key: FLINK-31160
 URL: https://issues.apache.org/jira/browse/FLINK-31160
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang
 Fix For: ml-2.2.0


Currently BroadcastUtils#withBroadcastStream does not support cogroup/join, 
since we restricted that users can only introduce one extra operator in the 
specified lambda function.

 

To support using join/cogroup in BroadcastUtils#withBroadcastStream, we would 
like to relax the restriction such that users can introduce more than more 
extra operator, but only the output operator can access the broadcast variables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30688) Disable Kryo fallback for tests in flink-ml-lib

2023-02-16 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-30688.
---
Resolution: Fixed

Fixed on master via:

7e26b7e2904a9d742e0318035a19bf2687855bb0

19f0f31ca93f4967c99968b3609a11763bfc0afb

904aba25951b5df170b6979a8a98e9fbff00d539

 

> Disable Kryo fallback for tests in flink-ml-lib
> ---
>
> Key: FLINK-30688
> URL: https://issues.apache.org/jira/browse/FLINK-30688
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Kryo serializer is typically inefficient and we expect all Flink ML 
> algorithms to work without using the Kryo serializer. It will be useful to 
> validate this by disabling Kryo fallback in all Flink ML tests.
> This can be done by configuring StreamExecutionEnvironment using 
> env.getConfig().disableGenericTypes().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31026) KBinsDiscretizer gives wrong bin edges when all values are same.

2023-02-16 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-31026:
-

Assignee: Fan Hong

> KBinsDiscretizer gives wrong bin edges when all values are same.
> 
>
> Key: FLINK-31026
> URL: https://issues.apache.org/jira/browse/FLINK-31026
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Assignee: Fan Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Current implements gives bin edges of \{Double.MIN_VALUE, Double.MAX_VALUE} 
> when all values are same.
> However, this bin cannot cover negative values and 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31026) KBinsDiscretizer gives wrong bin edges when all values are same.

2023-02-16 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-31026.
---
Fix Version/s: ml-2.2.0
   Resolution: Fixed

Fixed on master via 0af95f256d438a12946f9152c0a65ec916a61323

> KBinsDiscretizer gives wrong bin edges when all values are same.
> 
>
> Key: FLINK-31026
> URL: https://issues.apache.org/jira/browse/FLINK-31026
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Current implements gives bin edges of \{Double.MIN_VALUE, Double.MAX_VALUE} 
> when all values are same.
> However, this bin cannot cover negative values and 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30451) Add AlgoOperator for Swing

2023-02-16 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-30451:
--
Fix Version/s: ml-2.2.0
Affects Version/s: ml-2.2.0

> Add AlgoOperator for Swing
> --
>
> Key: FLINK-30451
> URL: https://issues.apache.org/jira/browse/FLINK-30451
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30451) Add AlgoOperator for Swing

2023-02-16 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-30451.
---
Resolution: Fixed

Fixed on master via e85fcf7635154871a49f6feb53553b0e98b21d88

> Add AlgoOperator for Swing
> --
>
> Key: FLINK-30451
> URL: https://issues.apache.org/jira/browse/FLINK-30451
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30451) Add AlgoOperator for Swing

2023-02-16 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-30451:
--
Summary: Add AlgoOperator for Swing  (was: Add Estimator and Transformer 
for Swing)

> Add AlgoOperator for Swing
> --
>
> Key: FLINK-30451
> URL: https://issues.apache.org/jira/browse/FLINK-30451
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30730) StringIndexer cannot handle null values correctly

2023-02-07 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-30730.
---
Fix Version/s: ml-2.2.0
   Resolution: Fixed

Fixed on master via 2b83d247cb5e81fb04b31399d436dbb9e809a473

> StringIndexer cannot handle null values correctly
> -
>
> Key: FLINK-30730
> URL: https://issues.apache.org/jira/browse/FLINK-30730
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.1.0
>Reporter: Fan Hong
>Assignee: Fan Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> When training data contains null values, StringIndexer throws a exception. 
> The reason is this method [1]: null values are neither String type nor Number 
> type.
> In StringIndexerModel, null values are also not handled correctly when 
> performing transformation.
>  
> [1] 
> [https://github.com/apache/flink-ml/blob/966cedd7bbab4e12d8d8b37dbd582146714e68a6/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/StringIndexer.java#L164]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30730) StringIndexer cannot handle null values correctly

2023-02-07 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-30730:
-

Assignee: Fan Hong

> StringIndexer cannot handle null values correctly
> -
>
> Key: FLINK-30730
> URL: https://issues.apache.org/jira/browse/FLINK-30730
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.1.0
>Reporter: Fan Hong
>Assignee: Fan Hong
>Priority: Major
>  Labels: pull-request-available
>
> When training data contains null values, StringIndexer throws a exception. 
> The reason is this method [1]: null values are neither String type nor Number 
> type.
> In StringIndexerModel, null values are also not handled correctly when 
> performing transformation.
>  
> [1] 
> [https://github.com/apache/flink-ml/blob/966cedd7bbab4e12d8d8b37dbd582146714e68a6/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/StringIndexer.java#L164]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30933) Result of join inside iterationBody loses max watermark

2023-02-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-30933:
--
Description: 
Currently if we execute a join inside an iteration body, the following program 
produces empty output. (In which the right result should be a list with \{2}.
{code:java}
public class Test {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStream> input1 =
env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));

DataStream> input2 =
env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));

DataStream> iterationJoin =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input1),
ReplayableDataStreamList.replay(input2),
IterationConfig.newBuilder()
.setOperatorLifeCycle(

IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
new MyIterationBody())
.get(0);

DataStream left = iterationJoin.map(x -> x.f0);
DataStream right = iterationJoin.map(x -> x.f1);
DataStream result =
left.join(right)
.where(x -> x)
.equalTo(x -> x)
.window(EndOfStreamWindows.get())
.apply((JoinFunction) (l1, l2) -> l1);

List collectedResult = 
IteratorUtils.toList(result.executeAndCollect());
List expectedResult = Arrays.asList(2L);
compareResultCollections(expectedResult, collectedResult, 
Long::compareTo);
}

private static class MyIterationBody implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream> input1 = variableStreams.get(0);
DataStream> input2 = dataStreams.get(0);

DataStream terminationCriteria = input1.flatMap(new 
TerminateOnMaxIter(1));

DataStream> res =
input1.join(input2)
.where(x -> x.f0)
.equalTo(x -> x.f0)
.window(EndOfStreamWindows.get())
.apply(
new JoinFunction<
Tuple2,
Tuple2,
Tuple2>() {
@Override
public Tuple2 join(
Tuple2 
longIntegerTuple2,
Tuple2 
longLongTuple2)
throws Exception {
return longLongTuple2;
}
});

return new IterationBodyResult(
DataStreamList.of(input1), DataStreamList.of(res), 
terminationCriteria);
}
}
}
{code}
 

There are two possible reasons:
 * The timer in `HeadOperator` is not a daemon process and it does not exit 
even flink job finishes.
 * The max watermark from the iteration body is missed.

 

 

  was:
Currently if we execute a join inside an iteration body, the following program 
produces empty output. (In which the right result should be a list with \{1, 2}.
{code:java}
public class Test {

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);

DataStream> input1 =
env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));

DataStream> input2 =
env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));

DataStream> iterationJoin =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input1),
ReplayableDataStreamList.replay(input2),
IterationConfig.newBuilder()
.setOperatorLifeCycle(

IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
new MyIterationBody())

[jira] [Assigned] (FLINK-30933) Result of join inside iterationBody loses max watermark

2023-02-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-30933:
-

Assignee: Zhipeng Zhang

> Result of join inside iterationBody loses max watermark
> ---
>
> Key: FLINK-30933
> URL: https://issues.apache.org/jira/browse/FLINK-30933
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> Currently if we execute a join inside an iteration body, the following 
> program produces empty output. (In which the right result should be a list 
> with \{1, 2}.
> {code:java}
> public class Test {
> public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> env.setParallelism(1);
> DataStream> input1 =
> env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));
> DataStream> input2 =
> env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));
> DataStream> iterationJoin =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(input1),
> ReplayableDataStreamList.replay(input2),
> IterationConfig.newBuilder()
> .setOperatorLifeCycle(
> 
> IterationConfig.OperatorLifeCycle.PER_ROUND)
> .build(),
> new MyIterationBody())
> .get(0);
> DataStream left = iterationJoin.map(x -> x.f0);
> DataStream right = iterationJoin.map(x -> x.f0);
> DataStream result =
> left.join(right)
> .where(x -> x)
> .equalTo(x -> x)
> .window(EndOfStreamWindows.get())
> .apply((JoinFunction) (l1, l2) -> 
> l1);
> List collectedResult = 
> IteratorUtils.toList(result.executeAndCollect());
> List expectedResult = Arrays.asList(1L, 2L);
> compareResultCollections(expectedResult, collectedResult, 
> Long::compareTo);
> }
> private static class MyIterationBody implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStream> input1 = variableStreams.get(0);
> DataStream> input2 = dataStreams.get(0);
> DataStream terminationCriteria = input1.flatMap(new 
> TerminateOnMaxIter(1));
> DataStream> res =
> input1.join(input2)
> .where(x -> x.f0)
> .equalTo(x -> x.f0)
> .window(EndOfStreamWindows.get())
> .apply(
> (JoinFunction<
> Tuple2,
> Tuple2,
> Tuple2>)
> (t1, t2) -> t2);
> return new IterationBodyResult(
> DataStreamList.of(input1), DataStreamList.of(res), 
> terminationCriteria);
> }
> }
> }
>  {code}
>  
> There are two possible reasons:
>  * The timer in `HeadOperator` is not a daemon process and it does not exit 
> even flink job finishes.
>  * The max watermark from the iteration body is missed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30933) Result of join inside iterationBody loses max watermark

2023-02-06 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30933:
-

 Summary: Result of join inside iterationBody loses max watermark
 Key: FLINK-30933
 URL: https://issues.apache.org/jira/browse/FLINK-30933
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang
 Fix For: ml-2.2.0


Currently if we execute a join inside an iteration body, the following program 
produces empty output. (In which the right result should be a list with \{1, 2}.
{code:java}
public class Test {

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);

DataStream> input1 =
env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));

DataStream> input2 =
env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));

DataStream> iterationJoin =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input1),
ReplayableDataStreamList.replay(input2),
IterationConfig.newBuilder()
.setOperatorLifeCycle(

IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
new MyIterationBody())
.get(0);

DataStream left = iterationJoin.map(x -> x.f0);
DataStream right = iterationJoin.map(x -> x.f0);
DataStream result =
left.join(right)
.where(x -> x)
.equalTo(x -> x)
.window(EndOfStreamWindows.get())
.apply((JoinFunction) (l1, l2) -> l1);

List collectedResult = 
IteratorUtils.toList(result.executeAndCollect());
List expectedResult = Arrays.asList(1L, 2L);
compareResultCollections(expectedResult, collectedResult, 
Long::compareTo);
}

private static class MyIterationBody implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream> input1 = variableStreams.get(0);
DataStream> input2 = dataStreams.get(0);

DataStream terminationCriteria = input1.flatMap(new 
TerminateOnMaxIter(1));

DataStream> res =
input1.join(input2)
.where(x -> x.f0)
.equalTo(x -> x.f0)
.window(EndOfStreamWindows.get())
.apply(
(JoinFunction<
Tuple2,
Tuple2,
Tuple2>)
(t1, t2) -> t2);

return new IterationBodyResult(
DataStreamList.of(input1), DataStreamList.of(res), 
terminationCriteria);
}
}
}
 {code}
 

There are two possible reasons:
 * The timer in `HeadOperator` is not a daemon process and it does not exit 
even flink job finishes.
 * The max watermark from the iteration body is missed.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30671) Add AlgoOperator for ClusteringEvaluator

2023-01-12 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30671:
-

 Summary: Add AlgoOperator for ClusteringEvaluator
 Key: FLINK-30671
 URL: https://issues.apache.org/jira/browse/FLINK-30671
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30401) Add Estimator and Transformer for MinHashLSH

2023-01-10 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-30401.
---
Resolution: Fixed

> Add Estimator and Transformer for MinHashLSH
> 
>
> Key: FLINK-30401
> URL: https://issues.apache.org/jira/browse/FLINK-30401
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Fan Hong
>Assignee: Fan Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Add Estimator and Transformer for MinHashLSH.
> Its function would be at least equivalent to Spark's 
> org.apache.spark.ml.feature.MinHashLSH. The relevant PR should contain the 
> following components:
>  * Java implementation/test (Must include)
>  * Python implementation/test (Optional)
>  * Markdown document (Optional)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30566) Add benchmark configurations for agglomerativeclustering, hashingtf, idf, kbinsdiscretizer, linearregression, linearsvc, logisticregression, ngram, regextokenizer, tok

2023-01-04 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-30566:
-

Assignee: Zhipeng Zhang

> Add benchmark configurations for agglomerativeclustering, hashingtf, idf, 
> kbinsdiscretizer, linearregression, linearsvc,  logisticregression, ngram, 
> regextokenizer, tokenizer and vectorindexer
> 
>
> Key: FLINK-30566
> URL: https://issues.apache.org/jira/browse/FLINK-30566
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Minor
>  Labels: pull-request-available
>
> Add benchmark configurations for the following algorithms:
>  * agglomerativeclustering
>  * hashingtf
>  * idf
>  * kbinsdiscretizer
>  * linearregression
>  * linearsvc
>  * logisticregression
>  * ngram
>  * regextokenizer
>  * tokenizer
>  * vectorindexer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30566) Add benchmark configurations for agglomerativeclustering, hashingtf, idf, kbinsdiscretizer, linearregression, linearsvc, logisticregression, ngram, regextokenizer, toke

2023-01-04 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30566:
-

 Summary: Add benchmark configurations for agglomerativeclustering, 
hashingtf, idf, kbinsdiscretizer, linearregression, linearsvc,  
logisticregression, ngram, regextokenizer, tokenizer and vectorindexer
 Key: FLINK-30566
 URL: https://issues.apache.org/jira/browse/FLINK-30566
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


Add benchmark configurations for the following algorithms:
 * agglomerativeclustering
 * hashingtf
 * idf
 * kbinsdiscretizer
 * linearregression
 * linearsvc
 * logisticregression
 * ngram
 * regextokenizer
 * tokenizer
 * vectorindexer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-30541:
-

Assignee: Zhipeng Zhang

> Add Transformer and Estimator for OnlineStandardScaler
> --
>
> Key: FLINK-30541
> URL: https://issues.apache.org/jira/browse/FLINK-30541
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30541:
-

 Summary: Add Transformer and Estimator for OnlineStandardScaler
 Key: FLINK-30541
 URL: https://issues.apache.org/jira/browse/FLINK-30541
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30451) Add Estimator and Transformer for Swing

2022-12-18 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30451:
-

 Summary: Add Estimator and Transformer for Swing
 Key: FLINK-30451
 URL: https://issues.apache.org/jira/browse/FLINK-30451
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30249) TableUtils.getRowTypeInfo() creating wrong TypeInformation

2022-11-30 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-30249:
-

Assignee: Zhipeng Zhang

> TableUtils.getRowTypeInfo() creating wrong TypeInformation
> --
>
> Key: FLINK-30249
> URL: https://issues.apache.org/jira/browse/FLINK-30249
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0, ml-2.1.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30249) TableUtils.getRowTypeInfo() creating wrong TypeInformation

2022-11-30 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30249:
-

 Summary: TableUtils.getRowTypeInfo() creating wrong TypeInformation
 Key: FLINK-30249
 URL: https://issues.apache.org/jira/browse/FLINK-30249
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0
Reporter: Zhipeng Zhang
 Fix For: ml-2.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29434) Add AlgoOperator for Splitter

2022-11-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29434.
---
  Assignee: weibo zhao
Resolution: Fixed

> Add AlgoOperator for Splitter
> -
>
> Key: FLINK-29434
> URL: https://issues.apache.org/jira/browse/FLINK-29434
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: weibo zhao
>Assignee: weibo zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Add AlgoOperator for Splitter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29434) Add AlgoOperator for Splitter

2022-11-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-29434:
--
Fix Version/s: ml-2.2.0

> Add AlgoOperator for Splitter
> -
>
> Key: FLINK-29434
> URL: https://issues.apache.org/jira/browse/FLINK-29434
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: weibo zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Add AlgoOperator for Splitter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29323) Add inputSizes parameter for VectorAssembler

2022-11-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29323.
---
  Assignee: weibo zhao
Resolution: Fixed

> Add inputSizes parameter for VectorAssembler
> 
>
> Key: FLINK-29323
> URL: https://issues.apache.org/jira/browse/FLINK-29323
> Project: Flink
>  Issue Type: New Feature
>Reporter: weibo zhao
>Assignee: weibo zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Refine Transformer for VectorAssembler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29323) Add inputSizes parameter for VectorAssembler

2022-11-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-29323:
--
Summary: Add inputSizes parameter for VectorAssembler  (was: Refine 
Transformer for VectorAssembler)

> Add inputSizes parameter for VectorAssembler
> 
>
> Key: FLINK-29323
> URL: https://issues.apache.org/jira/browse/FLINK-29323
> Project: Flink
>  Issue Type: New Feature
>Reporter: weibo zhao
>Priority: Major
>  Labels: pull-request-available
>
> Refine Transformer for VectorAssembler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29323) Add inputSizes parameter for VectorAssembler

2022-11-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-29323:
--
Fix Version/s: ml-2.2.0

> Add inputSizes parameter for VectorAssembler
> 
>
> Key: FLINK-29323
> URL: https://issues.apache.org/jira/browse/FLINK-29323
> Project: Flink
>  Issue Type: New Feature
>Reporter: weibo zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Refine Transformer for VectorAssembler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29911) Improve performance of AgglomerativeClustering

2022-11-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-29911:
--
Fix Version/s: ml-2.2.0

> Improve performance of AgglomerativeClustering
> --
>
> Key: FLINK-29911
> URL: https://issues.apache.org/jira/browse/FLINK-29911
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29911) Improve performance of AgglomerativeClustering

2022-11-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-29911:
-

Assignee: Zhipeng Zhang

> Improve performance of AgglomerativeClustering
> --
>
> Key: FLINK-29911
> URL: https://issues.apache.org/jira/browse/FLINK-29911
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29911) Improve performance of AgglomerativeClustering

2022-11-06 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-29911:
-

 Summary: Improve performance of AgglomerativeClustering
 Key: FLINK-29911
 URL: https://issues.apache.org/jira/browse/FLINK-29911
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29824) AgglomerativeClustering fails when the distanceThreshold is very large

2022-11-02 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29824.
---
Resolution: Fixed

> AgglomerativeClustering fails when the distanceThreshold is very large
> --
>
> Key: FLINK-29824
> URL: https://issues.apache.org/jira/browse/FLINK-29824
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> The current implementation did not consider following case:
> When distanceThreshold not null and set as a large value, all data points are 
> supposed to be assigned into a single cluster.
>  
> In this case, we should stop training when the number of the active clusters 
> is one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29843) Euclidean Distance Measure generates NAN distance values

2022-11-02 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29843.
---
Resolution: Fixed

> Euclidean Distance Measure generates NAN distance values
> 
>
> Key: FLINK-29843
> URL: https://issues.apache.org/jira/browse/FLINK-29843
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.1.0
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Currently Flink ML's `EuclideanDistanceMeasure.distance(...)` method might 
> return a negative value as the distance between two vectors given the 
> calculation accuracy of java doubles. This bug should be fixed to guarantee 
> that the distance is a non-negative value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29843) Euclidean Distance Measure generates NAN distance values

2022-11-02 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-29843:
-

Assignee: Yunfeng Zhou

> Euclidean Distance Measure generates NAN distance values
> 
>
> Key: FLINK-29843
> URL: https://issues.apache.org/jira/browse/FLINK-29843
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.1.0
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Currently Flink ML's `EuclideanDistanceMeasure.distance(...)` method might 
> return a negative value as the distance between two vectors given the 
> calculation accuracy of java doubles. This bug should be fixed to guarantee 
> that the distance is a non-negative value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29824) AgglomerativeClustering fails when the distanceThreshold is very large

2022-10-31 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reassigned FLINK-29824:
-

Assignee: Zhipeng Zhang

> AgglomerativeClustering fails when the distanceThreshold is very large
> --
>
> Key: FLINK-29824
> URL: https://issues.apache.org/jira/browse/FLINK-29824
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>
> The current implementation did not consider following case:
> When distanceThreshold not null and set as a large value, all data points are 
> supposed to be assigned into a single cluster.
>  
> In this case, we should stop training when the number of the active clusters 
> is one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29824) AgglomerativeClustering fails when the distanceThreshold is very large

2022-10-31 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-29824:
-

 Summary: AgglomerativeClustering fails when the distanceThreshold 
is very large
 Key: FLINK-29824
 URL: https://issues.apache.org/jira/browse/FLINK-29824
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


The current implementation did not consider following case:

When distanceThreshold not null and set as a large value, all data points are 
supposed to be assigned into a single cluster.

 

In this case, we should stop training when the number of the active clusters is 
one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290
 ] 

Zhipeng Zhang edited comment on FLINK-29176 at 10/12/22 10:11 AM:
--

Hi [~zxcoccer] , sorry for the late reply.

This Jira is duplicate with https://issues.apache.org/jira/browse/FLINK-28571.

For other possisble Flink ML JIRAs, please refer to

https://issues.apache.org/jira/browse/FLINK-29604?filter=-1=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20%22Library%20%2F%20Machine%20Learning%22%20order%20by%20updated%20DESC
 


was (Author: JIRAUSER284184):
Hi [~zxcoccer] , sorry for the late reply.

This Jira is duplicate with https://issues.apache.org/jira/browse/FLINK-28571.

For other possisble Flink ML JIRAs, please refer to 
https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28906) Add AlgoOperator for AgglomerativeClustering

2022-10-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-28906.
---
Resolution: Fixed

> Add AlgoOperator for AgglomerativeClustering
> 
>
> Key: FLINK-28906
> URL: https://issues.apache.org/jira/browse/FLINK-28906
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290
 ] 

Zhipeng Zhang edited comment on FLINK-29176 at 10/12/22 9:07 AM:
-

Hi [~zxcoccer] , sorry for the late reply.

This Jira is duplicate with https://issues.apache.org/jira/browse/FLINK-28571.

For other possisble Flink ML JIRAs, please refer to 
https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0


was (Author: JIRAUSER284184):
Hi [~zxcoccer] , sorry for the late reply.

This Jira is with https://issues.apache.org/jira/browse/FLINK-28571 and is 
resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master.

 

For other possisble Flink ML JIRAs, please refer to 
https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


[ https://issues.apache.org/jira/browse/FLINK-29176 ]


Zhipeng Zhang deleted comment on FLINK-29176:
---

was (Author: JIRAUSER284184):
Duplicate with https://issues.apache.org/jira/browse/FLINK-28571

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290
 ] 

Zhipeng Zhang edited comment on FLINK-29176 at 10/12/22 9:06 AM:
-

Hi [~zxcoccer] , sorry for the late reply.

This Jira is with https://issues.apache.org/jira/browse/FLINK-28571 and is 
resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master.

 

For other possisble Flink ML JIRAs, please refer to 
https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0


was (Author: JIRAUSER284184):
Hi [~zxcoccer] , sorry for the late reply.

This Jira is resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master.

 

For other possisble Flink ML JIRAs, please refer to 
https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang closed FLINK-29176.
-
Resolution: Duplicate

Duplicate with https://issues.apache.org/jira/browse/FLINK-28571

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang reopened FLINK-29176:
---

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29176.
---
Resolution: Fixed

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29176) Add python support for ChiSqTest

2022-10-12 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616290#comment-17616290
 ] 

Zhipeng Zhang commented on FLINK-29176:
---

Hi [~zxcoccer] , sorry for the late reply.

This Jira is resolved via 7cbada75deb07c0c93644dbb5b0f312d9822a2dc on master.

 

For other possisble Flink ML JIRAs, please refer to 
https://issues.apache.org/jira/browse/FLINK-29176?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%20ml-2.2.0

> Add python support for ChiSqTest
> 
>
> Key: FLINK-29176
> URL: https://issues.apache.org/jira/browse/FLINK-29176
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.2.0
>
>
> We already implemented ChiSqTest in java. In this pr, we aim to add python 
> source/test/example for ChiSqTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29044) Add Transformer for DCT

2022-09-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29044.
---
Resolution: Fixed

> Add Transformer for DCT
> ---
>
> Key: FLINK-29044
> URL: https://issues.apache.org/jira/browse/FLINK-29044
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29171) Add documents for MaxAbs, FeatureHasher, Interaction, VectorSlicer, ElementwiseProduct and Binarizer

2022-09-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29171.
---
Resolution: Fixed

> Add documents for MaxAbs, FeatureHasher, Interaction, VectorSlicer, 
> ElementwiseProduct and Binarizer
> 
>
> Key: FLINK-29171
> URL: https://issues.apache.org/jira/browse/FLINK-29171
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Machine Learning
>Reporter: weibo zhao
>Assignee: weibo zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Add docs of MaxAbs, FeatureHasher, Interaction, VectorSlicer, 
> ElementwiseProduct and Binarizer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29175) Add documents for KBinsDiscretizer, VectorIndexer, Tokenizer, RegexTokenizer

2022-09-06 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang resolved FLINK-29175.
---
Resolution: Fixed

> Add documents for KBinsDiscretizer, VectorIndexer, Tokenizer, RegexTokenizer
> 
>
> Key: FLINK-29175
> URL: https://issues.apache.org/jira/browse/FLINK-29175
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >