Thanks Raghu...we are on the same page...but am not sure how it applies to my
own code :(
Pipeline p = Pipeline.create(options);
............................etc...........................try {
PCollection<KV<String, String>> kafkarecords =
p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
.withValueCoder(StringUtf8Coder.of()).withoutMetadata())
.apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String,
String>>() {...................etc......................
@Override public void
processElement(ProcessContext ctx) throws Exception
{............................etc......................................
I do a DoFn on an anonymous class and there, each record gets executed one at a
time. Supposedly.But the right number of outcome always exceeds the expected
value...inconsistent and varies after each run.I have my own doubts about how
to guarantee the "aggregation" consistency in pipelines "parallelism" ...I
receive n number of records, that is proven fact, but mt transformation
executes more records.Sorry for the long email...just dont know how to verify
the pipeline consistency...Cheers From: Raghu Angadi <[email protected]>
To: amir bahmanyari <[email protected]>
Cc: "[email protected]" <[email protected]>
Sent: Tuesday, August 2, 2016 10:31 PM
Subject: Re: Any data gets cached?
On Tue, Aug 2, 2016 at 4:37 PM, amir bahmanyari <[email protected]> wrote:
.apply(new CountWords())
// counts same word occurence and return it as a long
That is an aggregation.
// Format the long result created by CountWords object to text
.apply(MapElements.via(new FormatAsTextFn()))
This is not.
// TextIO to write result as text into an output-file set in options