RE: global function over partitions
} //System.out.println("Count per hash is " + count); }; }); result.timeWindowAll(Time.of(2, TimeUnit.SECONDS)) .apply(new AllWindowFunction<Tuple2<Integer, Integer>, Tuple1, TimeWindow>() { @Override public void apply(TimeWindow arg0, Iterable<Tuple2<Integer, Integer>> arg1, Collector<Tuple1> arg2) throws Exception { // Compuatation int count = 0; for (Tuple2<Integer, Integer> value : arg1) { count++; } //System.out.println("Count aggregated metrics is " //+ count + " at " + System.currentTimeMillis()); arg2.collect(new Tuple1(count)); } }).setParallelism(1) .writeAsText("/tmp/testoutput", WriteMode.OVERWRITE); env.execute("main stream application"); } Regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Robert Metzger [mailto:rmetz...@apache.org] Sent: Friday, January 15, 2016 10:18 AM To: user@flink.apache.org Subject: Re: global function over partitions Hi Radu, I'm sorry for the delayed response. I'm not sure what the purpose of DataStream.global() actually is. I've opened a JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240. For getting the final metrics, you can just call ".timeWindowAll()", without a ".global()" call before. The timeWindowAll() will run with a parallelism of one, hence it will receive the data from all partitions. Regards, Robert On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote: Hi, I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program: DataStream stream = env.Read... end.setParallelism(10); //Compute phase DataStream result = stream.keyBy(_).window(_).apply(); //end compute phase //get the metrics result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS)) .trigger(EventTimeTrigger.create()).apply ().writeAsText(); For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this. I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same. result.map((//extract some of the Tuple fields).keyBy( new KeySelector<Tuple2<Long,Long>, Integer>() { @Override public Integer getKey(Tuple2<Long, Long> arg0) throws Exception { return 1; } @Override public int hashCode() { return 1; } }). timeWindowAll().apply() Thanks for the help/ideas
Re: global function over partitions
, Integer>, Tuple, TimeWindow>() { > public void apply( > Tuple arg0, > TimeWindow arg1, > > java.lang.Iterable<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> > arg2, > > org.apache.flink.util.Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, > Integer>> arg3) > throws Exception { > > // Compuatation > int count = 0; > for (Tuple2<Integer, String> value : > arg2) { > count++; > arg3.collect(new > Tuple2<Integer, Integer>(value.f0, > > value.f1.length())); > } > //System.out.println("Count per hash > is " + count); > }; > >}); > > result.timeWindowAll(Time.of(2, TimeUnit.SECONDS)) >.apply(new AllWindowFunction<Tuple2<Integer, > Integer>, Tuple1, TimeWindow>() { > @Override > public void apply(TimeWindow arg0, > Iterable<Tuple2<Integer, > Integer>> arg1, > Collector<Tuple1> > arg2) throws Exception { > > // Compuatation > int count = 0; > for (Tuple2<Integer, Integer> value > : arg1) { > count++; > } > //System.out.println("Count > aggregated metrics is " > //+ count + " at " + > System.currentTimeMillis()); > arg2.collect(new Tuple1(count)); > > } >}).setParallelism(1) >.writeAsText("/tmp/testoutput", > WriteMode.OVERWRITE); > > env.execute("main stream application"); > >} > > > > Regards, > > > Dr. Radu Tudoran > Research Engineer - Big Data Expert > IT R Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > This e-mail and its attachments contain confidential information from HUAWEI, > which is intended only for the person or entity whose address is listed > above. Any use of the information contained herein in any way (including, but > not limited to, total or partial disclosure, reproduction, or dissemination) > by persons other than the intended recipient(s) is prohibited. If you receive > this e-mail in error, please notify the sender by phone or email immediately > and delete it! > > From: Robert Metzger [mailto:rmetz...@apache.org] > Sent: Friday, January 15, 2016 10:18 AM > To: user@flink.apache.org > Subject: Re: global function over partitions > > Hi Radu, > > I'm sorry for the delayed response. > I'm not sure what the purpose of DataStream.global() actually is. I've opened > a JIRA to document or remove it: > https://issues.apache.org/jira/browse/FLINK-3240. > > For getting the final metrics, you can just call ".timeWindowAll()", without > a ".global()" call before. The timeWindowAll() will run with a parallelism of > one, hence it will receive the data from all partitions. > > Regards, > Robert > > > > > > On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote: > Hi, > > I am trying to comp
Re: global function over partitions
Hi Radu, I'm sorry for the delayed response. I'm not sure what the purpose of DataStream.global() actually is. I've opened a JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240. For getting the final metrics, you can just call ".timeWindowAll()", without a ".global()" call before. The timeWindowAll() will run with a parallelism of one, hence it will receive the data from all partitions. Regards, Robert On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoranwrote: > Hi, > > I am trying to compute some final statistics over a stream topology. For > this I would like to gather all data from all windows and parallel > partitions into a single/global window. Could you suggest a solution for > this. I saw that the map function has a ".global()" but I end up with the > same number of partitions as I have in the main computation. Bellow you can > find a schema for the program: > > > DataStream stream = env.Read... > > end.setParallelism(10); > //Compute phase > DataStream result = stream.keyBy(_).window(_).apply(); > //end compute phase > > > //get the metrics > result.map(//extract some of the Tuple > fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, > TimeUnit.SECONDS)) > .trigger(EventTimeTrigger.create()).apply ().writeAsText(); > > > For this last function - I would expect that even if I had parallel > computation during the compute phase, I can select part of the events from > all partitions and gather all these into one unique window. However, I do > not seem to be successful in this. > I also tried by applying a keyBy() to the result stream in which I > assigned the same hash to any event, but the result remains the same. > result.map((//extract some of the Tuple fields).keyBy( > new KeySelector , Integer>() { > @Override > public Integer getKey(Tuple2 arg0) > throws Exception { > > return 1; > } > @Override > public int hashCode() { > > return 1; > } > > }). timeWindowAll().apply() > > > Thanks for the help/ideas > > > >
global function over partitions
Hi, I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program: DataStream stream = env.Read... end.setParallelism(10); //Compute phase DataStream result = stream.keyBy(_).window(_).apply(); //end compute phase //get the metrics result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS)) .trigger(EventTimeTrigger.create()).apply ().writeAsText(); For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this. I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same. result.map((//extract some of the Tuple fields).keyBy( new KeySelector, Integer>() { @Override public Integer getKey(Tuple2 arg0) throws Exception { return 1; } @Override public int hashCode() { return 1; } }). timeWindowAll().apply() Thanks for the help/ideas