package eu.euranova.leadcep;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

class TypeA extends Tuple3<Long, Long, String> {};
class TypeB extends Tuple3<Long, Long, String> {};

public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<TestType> A = env.fromElements(
            new TestType(1L, 1L, "a"),
            new TestType(2L, 2L, "b"),
            new TestType(3L, 3L, "d")
        );
        DataStream<TestType> B = env.fromElements(
            new TestType(1L, 1L, "a"),
            new TestType(2L, 2L, "c"),
            new TestType(3L, 3L, "d")
        );

        A.print("A");
        B.print("B");

        KeySelector<TestType, Object> keySelector = new KeySelector<TestType, Object>() {
            @Override
            public Object getKey(TestType value) throws Exception {
                return value.getF2();
            }
        };
        A
            .connect(B)
            .keyBy(keySelector, keySelector)
            .flatMap(new AND<TestType, TestType>())
            //.returns(new TypeHint<Tuple2<TestType, TestType>>() {})
            .print("A and B");

/*
        DataStream<Tuple3<Long, Long, String>> C = env.fromElements(
            Tuple3.of(1L, 3L, "d"),
            Tuple3.of(2L, 2L, "d"),
            Tuple3.of(3L, 2L, "d"),
            Tuple3.of(4L, 3L, "d"),
            Tuple3.of(5L, 2L, "d")
        );
        C.print("C");
        C
            .keyBy(2)
            .flatMap(new CountWindowAverage())
            .print("Average(A)");
*/

        env.execute();
    }
}


