package com.guanyq.study.libraries.stateProcessorApi.FsStateBackend;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ReorganizeListState {
    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        FlatMapOperator<KeyedListState, KeyedListState> dataSet = 
Savepoint.load(env, 
"hdfs://MacBookPro:9000/flink/savepoints/8b900fc76a5a99dc5d0201e29cd7bef1/savepoint-8b900f-1e3d035ee046",
 new FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"))
                .readKeyedState("map", new ReaderFunction())
                .flatMap(new FlatMapFunction<KeyedListState, KeyedListState>() {
                    @Override
                    public void flatMap(KeyedListState keyedListState, 
Collector<KeyedListState> collector) {
                        KeyedListState newState = new KeyedListState();
                        newState.value = keyedListState.value.stream()
                                .map(x -> x + 
10000).collect(Collectors.toList());
                        newState.key = keyedListState.key;
                        collector.collect(newState);
                    }
                });
        BootstrapTransformation<KeyedListState> transformation = 
OperatorTransformation
                .bootstrapWith(dataSet)
                .keyBy(acc -> acc.key)
                .transform(new KeyedListStateBootstrapper());

        String local = String.valueOf(System.currentTimeMillis());
        System.out.println(local);

        Savepoint.create(new 
FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"),128)
                .withOperator("map",transformation)
                
.write("hdfs://MacBookPro:9000/flink/local/reorganizeListState/"+local);

        dataSet.writeAsText("./readSavepoint_"+System.currentTimeMillis());
        env.execute("ReorganizeListState");
    }

    static class KeyedListState {
        Integer key;
        List<Integer> value;
    }

    static class ReaderFunction extends KeyedStateReaderFunction<Integer, 
KeyedListState> {
        private transient ListState<Integer> listState;

        @Override
        public void open(Configuration parameters) {
            ListStateDescriptor<Integer> lsd =
                    new ListStateDescriptor<>("list", 
TypeInformation.of(Integer.class));
            listState = getRuntimeContext().getListState(lsd);
        }

        @Override
        public void readKey(
                Integer key,
                Context ctx,
                Collector<KeyedListState> out) throws Exception {
            List<Integer> li = new ArrayList<>();
            listState.get().forEach(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    li.add(integer);
                }
            });

            KeyedListState kl = new KeyedListState();
            kl.key = key;
            kl.value = li;

            out.collect(kl);
        }
    }

    static class KeyedListStateBootstrapper extends 
KeyedStateBootstrapFunction<Integer, KeyedListState> {
        private transient ListState<Integer> listState;

        @Override
        public void open(Configuration parameters) {
            ListStateDescriptor<Integer> lsd =
                    new ListStateDescriptor<>("list", 
TypeInformation.of(Integer.class));
            listState = getRuntimeContext().getListState(lsd);
        }

        @Override
        public void processElement(KeyedListState value, Context ctx) throws 
Exception {
            listState.addAll(value.value);
        }
    }
}

回复