Hi,

I'm being puzzled by the way data is commited in pipelines.

I'm reading data from RabbitMQ and trying to write it to files using TextIO and the following pipeline


        pipeline.apply("read_from_rabbit",
                RabbitMqIO.read()
                    .withUri(options.getRabbitMQUri())
                    .withQueue(options.getRabbitMQQueue()))
                .apply(Window.<RabbitMqMessage>into(
FixedWindows.of(Duration.standardSeconds(1))))
                .apply("transform_into_kv",
                        MapElements.into(TypeDescriptors
                                .kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))                                 .via(new RabbitMQMessageToKV(options.getIdPaths()))
                        )
                .apply("transform_into_strings",
MapElements.into(TypeDescriptors.strings())
                                .via(new RabbitToKafka.KVToString())
                        )
                .apply("log",
                        TextIO
                            .write()
                            .withWindowedWrites()
                            .withNumShards(1)
                            .to("messages")
                            .withSuffix(".txt")
                        );

When i run that pipeline, no data is written to messages*.txt, although I can see that messages are processed in the transform_to_string step. Is it because there is some kind of coordinated commit ? And if so, how can I know when such a commit occurs or fails ?

Reply via email to