package com.guanyq.study.libraries.stateProcessorApi.FsStateBackend;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class ReorganizeListState {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FlatMapOperator<KeyedListState, KeyedListState> dataSet =
Savepoint.load(env,
"hdfs://MacBookPro:9000/flink/savepoints/8b900fc76a5a99dc5d0201e29cd7bef1/savepoint-8b900f-1e3d035ee046",
new FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"))
.readKeyedState("map", new ReaderFunction())
.flatMap(new FlatMapFunction<KeyedListState, KeyedListState>() {
@Override
public void flatMap(KeyedListState keyedListState,
Collector<KeyedListState> collector) {
KeyedListState newState = new KeyedListState();
newState.value = keyedListState.value.stream()
.map(x -> x +
10000).collect(Collectors.toList());
newState.key = keyedListState.key;
collector.collect(newState);
}
});
BootstrapTransformation<KeyedListState> transformation =
OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(acc -> acc.key)
.transform(new KeyedListStateBootstrapper());
String local = String.valueOf(System.currentTimeMillis());
System.out.println(local);
Savepoint.create(new
FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"),128)
.withOperator("map",transformation)
.write("hdfs://MacBookPro:9000/flink/local/reorganizeListState/"+local);
dataSet.writeAsText("./readSavepoint_"+System.currentTimeMillis());
env.execute("ReorganizeListState");
}
static class KeyedListState {
Integer key;
List<Integer> value;
}
static class ReaderFunction extends KeyedStateReaderFunction<Integer,
KeyedListState> {
private transient ListState<Integer> listState;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Integer> lsd =
new ListStateDescriptor<>("list",
TypeInformation.of(Integer.class));
listState = getRuntimeContext().getListState(lsd);
}
@Override
public void readKey(
Integer key,
Context ctx,
Collector<KeyedListState> out) throws Exception {
List<Integer> li = new ArrayList<>();
listState.get().forEach(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
li.add(integer);
}
});
KeyedListState kl = new KeyedListState();
kl.key = key;
kl.value = li;
out.collect(kl);
}
}
static class KeyedListStateBootstrapper extends
KeyedStateBootstrapFunction<Integer, KeyedListState> {
private transient ListState<Integer> listState;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Integer> lsd =
new ListStateDescriptor<>("list",
TypeInformation.of(Integer.class));
listState = getRuntimeContext().getListState(lsd);
}
@Override
public void processElement(KeyedListState value, Context ctx) throws
Exception {
listState.addAll(value.value);
}
}
}