Hi,
I'm being puzzled by the way data is commited in pipelines.
I'm reading data from RabbitMQ and trying to write it to files using
TextIO and the following pipeline
pipeline.apply("read_from_rabbit",
RabbitMqIO.read()
.withUri(options.getRabbitMQUri())
.withQueue(options.getRabbitMQQueue()))
.apply(Window.<RabbitMqMessage>into(
FixedWindows.of(Duration.standardSeconds(1))))
.apply("transform_into_kv",
MapElements.into(TypeDescriptors
.kvs(TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(new
RabbitMQMessageToKV(options.getIdPaths()))
)
.apply("transform_into_strings",
MapElements.into(TypeDescriptors.strings())
.via(new RabbitToKafka.KVToString())
)
.apply("log",
TextIO
.write()
.withWindowedWrites()
.withNumShards(1)
.to("messages")
.withSuffix(".txt")
);
When i run that pipeline, no data is written to messages*.txt, although
I can see that messages are processed in the transform_to_string step.
Is it because there is some kind of coordinated commit ? And if so, how
can I know when such a commit occurs or fails ?