Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but
there are a few other easy-to-integrate sinks for testing that you can
check out in the docs here[1]

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks

On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <cavalla...@hotmail.com>
wrote:

> Hi All,
>
> On my code I have a DataStream that I would like to access. I need to
> understand what I'm getting for each transformation to check if the data
> that I'm working on make sense. How can I print into the console or get a
> file (csv, txt) for the variables: "stream", "enriched" and "result"?
>
> I have tried different way but no way to get the data.
>
> Thanks!
>
>
> *        FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_1", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .<Event>forMonotonousTimestamps()*
> *                        .withIdleness(Duration.ofMinutes(1))*
> *                        .withTimestampAssigner((event, timestamp) -> {*
> *                            return event.get_Time();*
> *                        });*
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
> *        DataStream<Tuple2<Event, Long>> enriched = stream*
> *                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)*
> *                .map(new StatefulSessionCalculator());*
>
> *        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result =
> enriched*
> *                .keyBy(new MyKeySelector())*
> *                .window(EventTimeSessionWindows.withDynamicGap(new
> DynamicSessionWindows()));*
>

Reply via email to