Hi All,

On my code I have a DataStream that I would like to access. I need to 
understand what I'm getting for each transformation to check if the data that 
I'm working on make sense. How can I print into the console or get a file (csv, 
txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new 
EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = 
enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicSessionWindows()));

Reply via email to