        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type",trustStoreType)
                .setProperty("ssl.truststore.password",trustStorePassword)
                .setProperty("ssl.truststore.location",trustStoreLocation)
                .setProperty("security.protocol",securityProtocol)
                .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
                .setGroupId(groupId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();


        DataStream<ObjectNode> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmstrategy = WatermarkStrategy
                .<Tuple4<Long, Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String, String> element, long handlingTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmstrategy);

        tuple4dswm.print();

        Table tupled4DsTable = tableEnv.fromDataStream(tuple4ds, Schema.newBuilder()
                    .column("f0","TIMESTAMP_LTZ(3)") // Only  TIMESTAMP_LTZ(0) to TIMESTAMP_LTZ(3) allowed
                    .column("f1","BIGINT")
                    .column("f2","STRING")
                    .column("f3","STRING")
                    .watermark("f0", "SOURCE_WATERMARK()")
                    .build()).as("eventTime", "handlingTime", "transactionId", "originalEvent");

        tupled4DsTable.printSchema();

        String className = tupled4DsTable.getClass().getSimpleName();
        System.out.println(className);

        Table result = tableEnv.sqlQuery("select transactionId" +
                ", originalEvent" +
                ", handlingTime" +
                ", handlingTime - ifnull(lag(handlingTime) over (partition by transactionId order by eventTime), handlingTime) as elapsedTime" +
                " from " + tupled4DsTable + " order by eventTime");

        result.printSchema();

        //TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG());
        //DataStream<Tuple4<String, String, Long, Long>> dsRow = tableEnv.toAppendStream(result, tupleType);
        //dsRow.print();
        DataStream<Row> xx = tableEnv.toDataStream(result);
