Hi Amir, It is not clear what you want do. I am not even sure if we both mean the same when we say 'aggregation' :). Can you confirm if you understand how Beam WordCount.java <https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L208> example works? Especially the lines 208-211.
On Tue, Aug 2, 2016 at 11:25 AM, amir bahmanyari <[email protected]> wrote: > Hi Raghu, > Any opinion on this pls? I appreciate your time... > Cheers > > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]" <[email protected]> > *Sent:* Sunday, July 31, 2016 3:05 PM > > *Subject:* Re: Any data gets cached? > > Hi Raghu, > Thanks so much for your response. Following is how I am reading unbounded > records from Kafka through KafkaIO() & processing them in its corresponding > inner class. > How do I include "aggregation" in this call? > have a great weekend. > > Pipeline p = Pipeline.create(options); > ............................etc........................... > try { > PCollection<KV<String, String>> kafkarecords = > p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic) > .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) > .apply(ParDo.named("startBundle").of( > new DoFn<KV<byte[], String>, KV<String, String>>() { > ...................etc...................... > > @Override > public void processElement(ProcessContext ctx) throws Exception { > ............................etc...................................... > ------------------------------ > *From:* Raghu Angadi <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Saturday, July 30, 2016 2:15 PM > *Subject:* Re: Any data gets cached? > > On the surface it looks like you are asking about basic aggregations. > These are of course provided by Beam too. Almost all Beam examples make use > of these. See 'Count.<string>PerElement() > <https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L152>' > in WordCound.java example. If not either post your Beam code or roughly > equivalent SQL here. > > On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <[email protected]> > wrote: > > Hi Raghu, > Is this the right assumption that if results are not aggregated we may > have inconsistency in what the final result may look like? > What would be the best aggregation approach to guarantee consistency? Even > if there is perf. cost. > Thanks > > ------------------------------ > *From:* Raghu Angadi <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Friday, July 29, 2016 2:32 PM > *Subject:* Re: Any data gets cached? > > I suggest you try aggregating using Beam primitives (GroupByKey, count > etc), see if it produced in consistent results. > > On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <[email protected]> > wrote: > > Sorry colleagues, > I am having a moving target in my Beam app with FlinkRunner in a Flink > Cluster of 2 nodes. > Every run produces a different result while we know what the result MUST > be: its an expected fixed number. > I checked and see Kafka is NOT sending any extra records. > My first suspect was Redis thread-UNsafe hashmap objects. > I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very > first time every PERFECT. > I then re-run exact same thing expecting the exact same previous result. > But its different. Run it again. Another different wrong result. A > different result each time. > No code change nothing different. > > I am wondering if some previous data/record gets cached by > Beam/FlinkRunner/KafkaIO invocation etc. somewhere. > Sorry for the long email. Am losing my mind catching this moving target :)) > Appreciate your kind feedback on this. > Cheers+have a great weekend. > Amir- > > > > > > > > > >
