package eu.euranova.leadcep;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT, RIGHT, Tuple2<LEFT, RIGHT>> {

    private transient ListState<LEFT> leftState;
    private transient ListState<RIGHT> rightState;

    @Override
    public void open(Configuration config) {
        ListStateDescriptor<LEFT> left_descriptor =
            new ListStateDescriptor<>(
                "and_left",
                TypeInformation.of(new TypeHint<LEFT>() {
                }));
        leftState = getRuntimeContext().getListState(left_descriptor);


        ListStateDescriptor<RIGHT> right_descriptor =
            new ListStateDescriptor<>(
                "and_left",
                TypeInformation.of(new TypeHint<RIGHT>() {
                }));
        rightState = getRuntimeContext().getListState(right_descriptor);
    }

    @Override
    public void flatMap1(LEFT leftInput, Collector<Tuple2<LEFT, RIGHT>> out) throws Exception {
        this.leftState.add(leftInput);

        Iterable<RIGHT> rightList = this.rightState.get();
        rightList.forEach(rightEvent -> out.collect(Tuple2.of(leftInput, rightEvent)));
    }

    @Override
    public void flatMap2(RIGHT rightInput, Collector<Tuple2<LEFT, RIGHT>> out) throws Exception {
        this.rightState.add(rightInput);

        Iterable<LEFT> leftList = this.leftState.get();
        leftList.forEach(leftEvent -> out.collect(Tuple2.of(leftEvent, rightInput)));
    }
}
