[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source
[ https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395709#comment-17395709 ] Liebing Yu commented on FLINK-16556: I have a question: why more changes are needed to use parallel source. As for whether the Random instance needs to be serialized, I think it has no effect on the calculation of the entire example from a functional point of view. But from the perspective of the completeness of the example, I think it is necessary, because Random is a stateful field, and we should persist its state in a stateful operator. > TopSpeedWindowing should implement checkpointing for its source > --- > > Key: FLINK-16556 > URL: https://issues.apache.org/jira/browse/FLINK-16556 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.10.0 >Reporter: Nico Kruber >Assignee: Liebing Yu >Priority: Minor > Labels: auto-deprioritized-major, starter > > {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} > does not implement checkpointing of its state, namely the current speeds and > distances per car. The main problem with this is that the window trigger only > fires if the new distance has increased by at least 50 but after restore, it > will be reset to 0 and could thus not produce output for a while. > > Either the distance calculation could use {{Math.abs}} or the source needs > proper checkpointing. Optionally with allowing the number of cars to > increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source
[ https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393744#comment-17393744 ] Nico Kruber commented on FLINK-16556: - Looks about right...luckily, the source isn't declared as a parallel source, otherwise more changes would be needed. One thing I'm not 100% sure about: do we also need to serializer the Random instance in order to get the same sequence of actions after restart? > TopSpeedWindowing should implement checkpointing for its source > --- > > Key: FLINK-16556 > URL: https://issues.apache.org/jira/browse/FLINK-16556 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.10.0 >Reporter: Nico Kruber >Priority: Minor > Labels: auto-deprioritized-major, starter > > {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} > does not implement checkpointing of its state, namely the current speeds and > distances per car. The main problem with this is that the window trigger only > fires if the new distance has increased by at least 50 but after restore, it > will be reset to 0 and could thus not produce output for a while. > > Either the distance calculation could use {{Math.abs}} or the source needs > proper checkpointing. Optionally with allowing the number of cars to > increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source
[ https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377276#comment-17377276 ] Liebing Yu commented on FLINK-16556: I've implement the checkpointing of CarSource. If you think it is alright, could you assign this to me? I will make a pull request. {code:java} //代码占位符 private static class CarSource implements SourceFunction>, CheckpointedFunction { private static final long serialVersionUID = 1L; private Integer[] speeds; private Double[] distances; private transient ListState speedsState; private transient ListState distancesState; private Random rand = new Random(); private volatile boolean isRunning = true; private CarSource(int numOfCars) { speeds = new Integer[numOfCars]; distances = new Double[numOfCars]; Arrays.fill(speeds, 50); Arrays.fill(distances, 0d); } public static CarSource create(int cars) { return new CarSource(cars); } @Override public void run(SourceContext> ctx) throws Exception { while (isRunning) { Thread.sleep(100); for (int carId = 0; carId < speeds.length; carId++) { if (rand.nextBoolean()) { speeds[carId] = Math.min(100, speeds[carId] + 5); } else { speeds[carId] = Math.max(0, speeds[carId] - 5); } distances[carId] += speeds[carId] / 3.6d; Tuple4 record = new Tuple4<>( carId, speeds[carId], distances[carId], System.currentTimeMillis()); ctx.collect(record); } } } @Override public void cancel() { isRunning = false; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { speedsState.clear(); distancesState.clear(); for (int i = 0; i < speeds.length; i++) { speedsState.add(speeds[i]); distancesState.add(distances[i]); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor speedsStateDescriptor = new ListStateDescriptor<>( "speeds", TypeInformation.of(new TypeHint() {})); ListStateDescriptor distanceStateDescriptor = new ListStateDescriptor<>( "distances", TypeInformation.of(new TypeHint() {})); speedsState = context.getOperatorStateStore().getListState(speedsStateDescriptor); distancesState = context.getOperatorStateStore().getListState(distanceStateDescriptor); if (context.isRestored()) { int i = 0; for (Integer speed : speedsState.get()) { speeds[i++] = speed; } i = 0; for (Double distance : distancesState.get()) { distances[i++] = distance; } } } } {code} > TopSpeedWindowing should implement checkpointing for its source > --- > > Key: FLINK-16556 > URL: https://issues.apache.org/jira/browse/FLINK-16556 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.10.0 >Reporter: Nico Kruber >Priority: Minor > Labels: auto-deprioritized-major, starter > > {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} > does not implement checkpointing of its state, namely the current speeds and > distances per car. The main problem with this is that the window trigger only > fires if the new distance has increased by at least 50 but after restore, it > will be reset to 0 and could thus not produce output for a while. > > Either the distance calculation could use {{Math.abs}} or the source needs > proper checkpointing. Optionally with allowing the number of cars to > increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source
[ https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336278#comment-17336278 ] Flink Jira Bot commented on FLINK-16556: This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > TopSpeedWindowing should implement checkpointing for its source > --- > > Key: FLINK-16556 > URL: https://issues.apache.org/jira/browse/FLINK-16556 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.10.0 >Reporter: Nico Kruber >Priority: Major > Labels: stale-major > > {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} > does not implement checkpointing of its state, namely the current speeds and > distances per car. The main problem with this is that the window trigger only > fires if the new distance has increased by at least 50 but after restore, it > will be reset to 0 and could thus not produce output for a while. > > Either the distance calculation could use {{Math.abs}} or the source needs > proper checkpointing. Optionally with allowing the number of cars to > increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source
[ https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17327994#comment-17327994 ] Flink Jira Bot commented on FLINK-16556: This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > TopSpeedWindowing should implement checkpointing for its source > --- > > Key: FLINK-16556 > URL: https://issues.apache.org/jira/browse/FLINK-16556 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.10.0 >Reporter: Nico Kruber >Priority: Major > Labels: stale-major > > {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} > does not implement checkpointing of its state, namely the current speeds and > distances per car. The main problem with this is that the window trigger only > fires if the new distance has increased by at least 50 but after restore, it > will be reset to 0 and could thus not produce output for a while. > > Either the distance calculation could use {{Math.abs}} or the source needs > proper checkpointing. Optionally with allowing the number of cars to > increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)