[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source

2021-08-08 Thread Liebing Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395709#comment-17395709
 ] 

Liebing Yu commented on FLINK-16556:


I have a question: why more changes are needed to use parallel source.

As for whether the Random instance needs to be serialized, I think it has no 
effect on the calculation of the entire example from a functional point of 
view. But from the perspective of the completeness of the example, I think it 
is necessary, because Random is a stateful field, and we should persist its 
state in a stateful operator.

> TopSpeedWindowing should implement checkpointing for its source
> ---
>
> Key: FLINK-16556
> URL: https://issues.apache.org/jira/browse/FLINK-16556
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.10.0
>Reporter: Nico Kruber
>Assignee: Liebing Yu
>Priority: Minor
>  Labels: auto-deprioritized-major, starter
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
>  does not implement checkpointing of its state, namely the current speeds and 
> distances per car. The main problem with this is that the window trigger only 
> fires if the new distance has increased by at least 50 but after restore, it 
> will be reset to 0 and could thus not produce output for a while.
>  
> Either the distance calculation could use {{Math.abs}} or the source needs 
> proper checkpointing. Optionally with allowing the number of cars to 
> increase/decrease.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source

2021-08-05 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393744#comment-17393744
 ] 

Nico Kruber commented on FLINK-16556:
-

Looks about right...luckily, the source isn't declared as a parallel source, 
otherwise more changes would be needed.

One thing I'm not 100% sure about: do we also need to serializer the Random 
instance in order to get the same sequence of actions after restart?

> TopSpeedWindowing should implement checkpointing for its source
> ---
>
> Key: FLINK-16556
> URL: https://issues.apache.org/jira/browse/FLINK-16556
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.10.0
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: auto-deprioritized-major, starter
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
>  does not implement checkpointing of its state, namely the current speeds and 
> distances per car. The main problem with this is that the window trigger only 
> fires if the new distance has increased by at least 50 but after restore, it 
> will be reset to 0 and could thus not produce output for a while.
>  
> Either the distance calculation could use {{Math.abs}} or the source needs 
> proper checkpointing. Optionally with allowing the number of cars to 
> increase/decrease.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source

2021-07-08 Thread Liebing Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377276#comment-17377276
 ] 

Liebing Yu commented on FLINK-16556:


I've implement the checkpointing of CarSource. If you think it is alright, 
could you assign this to me? I will make a pull request.
{code:java}
//代码占位符
private static class CarSource
implements SourceFunction>,
   CheckpointedFunction {

private static final long serialVersionUID = 1L;
private Integer[] speeds;
private Double[] distances;

private transient ListState speedsState;
private transient ListState distancesState;

private Random rand = new Random();

private volatile boolean isRunning = true;

private CarSource(int numOfCars) {
speeds = new Integer[numOfCars];
distances = new Double[numOfCars];
Arrays.fill(speeds, 50);
Arrays.fill(distances, 0d);
}

public static CarSource create(int cars) {
return new CarSource(cars);
}

@Override
public void run(SourceContext> ctx)
throws Exception {

while (isRunning) {
Thread.sleep(100);
for (int carId = 0; carId < speeds.length; carId++) {
if (rand.nextBoolean()) {
speeds[carId] = Math.min(100, speeds[carId] + 5);
} else {
speeds[carId] = Math.max(0, speeds[carId] - 5);
}
distances[carId] += speeds[carId] / 3.6d;
Tuple4 record =
new Tuple4<>(
carId,
speeds[carId],
distances[carId],
System.currentTimeMillis());
ctx.collect(record);
}
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception 
{
speedsState.clear();
distancesState.clear();
for (int i = 0; i < speeds.length; i++) {
speedsState.add(speeds[i]);
distancesState.add(distances[i]);
}
}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
ListStateDescriptor speedsStateDescriptor =
new ListStateDescriptor<>(
"speeds",
TypeInformation.of(new TypeHint() {}));
ListStateDescriptor distanceStateDescriptor =
new ListStateDescriptor<>(
"distances",
TypeInformation.of(new TypeHint() {}));

speedsState = 
context.getOperatorStateStore().getListState(speedsStateDescriptor);
distancesState = 
context.getOperatorStateStore().getListState(distanceStateDescriptor);

if (context.isRestored()) {
int i = 0;
for (Integer speed : speedsState.get()) {
speeds[i++] = speed;
}
i = 0;
for (Double distance : distancesState.get()) {
distances[i++] = distance;
}
}
}
}
{code}

> TopSpeedWindowing should implement checkpointing for its source
> ---
>
> Key: FLINK-16556
> URL: https://issues.apache.org/jira/browse/FLINK-16556
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.10.0
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: auto-deprioritized-major, starter
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
>  does not implement checkpointing of its state, namely the current speeds and 
> distances per car. The main problem with this is that the window trigger only 
> fires if the new distance has increased by at least 50 but after restore, it 
> will be reset to 0 and could thus not produce output for a while.
>  
> Either the distance calculation could use {{Math.abs}} or the source needs 
> proper checkpointing. Optionally with allowing the number of cars to 
> increase/decrease.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336278#comment-17336278
 ] 

Flink Jira Bot commented on FLINK-16556:


This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> TopSpeedWindowing should implement checkpointing for its source
> ---
>
> Key: FLINK-16556
> URL: https://issues.apache.org/jira/browse/FLINK-16556
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.10.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: stale-major
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
>  does not implement checkpointing of its state, namely the current speeds and 
> distances per car. The main problem with this is that the window trigger only 
> fires if the new distance has increased by at least 50 but after restore, it 
> will be reset to 0 and could thus not produce output for a while.
>  
> Either the distance calculation could use {{Math.abs}} or the source needs 
> proper checkpointing. Optionally with allowing the number of cars to 
> increase/decrease.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17327994#comment-17327994
 ] 

Flink Jira Bot commented on FLINK-16556:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> TopSpeedWindowing should implement checkpointing for its source
> ---
>
> Key: FLINK-16556
> URL: https://issues.apache.org/jira/browse/FLINK-16556
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.10.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: stale-major
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
>  does not implement checkpointing of its state, namely the current speeds and 
> distances per car. The main problem with this is that the window trigger only 
> fires if the new distance has increased by at least 50 but after restore, it 
> will be reset to 0 and could thus not produce output for a while.
>  
> Either the distance calculation could use {{Math.abs}} or the source needs 
> proper checkpointing. Optionally with allowing the number of cars to 
> increase/decrease.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)