RE: global function over partitions

2016-01-15 Thread Radu Tudoran
  }
 //System.out.println("Count per hash 
is " + count);
  };

   });

  result.timeWindowAll(Time.of(2, TimeUnit.SECONDS))
   .apply(new AllWindowFunction<Tuple2<Integer, 
Integer>, Tuple1, TimeWindow>() {
  @Override
  public void apply(TimeWindow arg0,
Iterable<Tuple2<Integer, 
Integer>> arg1,
Collector<Tuple1> 
arg2) throws Exception {

 // Compuatation 
 int count = 0;
 for (Tuple2<Integer, Integer> value : 
arg1) {
count++;
 }
 //System.out.println("Count aggregated 
metrics is "
 //+ count + " at " + 
System.currentTimeMillis());
 arg2.collect(new Tuple1(count));

  }
   }).setParallelism(1)
   .writeAsText("/tmp/testoutput", WriteMode.OVERWRITE);

  env.execute("main stream application");

   }



Regards,


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Friday, January 15, 2016 10:18 AM
To: user@flink.apache.org
Subject: Re: global function over partitions

Hi Radu,

I'm sorry for the delayed response.
I'm not sure what the purpose of DataStream.global() actually is. I've opened a 
JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240.

For getting the final metrics, you can just call ".timeWindowAll()", without a 
".global()" call before. The timeWindowAll() will run with a parallelism of 
one, hence it will receive the data from all partitions.

Regards,
Robert





On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran 
<radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote:
Hi,

I am trying to compute some final statistics over a stream topology. For this I 
would like to gather all data from all windows and parallel partitions into a 
single/global window. Could you suggest a solution for this. I saw that the map 
function has a ".global()" but I end up with the same number of partitions as I 
have in the main computation. Bellow you can find a schema for the program:


DataStream stream = env.Read...

end.setParallelism(10);
//Compute phase
DataStream result = stream.keyBy(_).window(_).apply();
//end compute phase


//get the metrics
result.map(//extract some of the Tuple 
fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, 
TimeUnit.SECONDS))
.trigger(EventTimeTrigger.create()).apply ().writeAsText();


For this last function - I would expect that even if I had parallel computation 
during the compute phase, I can select part of the events from all partitions 
and gather all these into one unique window. However, I do not seem to be 
successful in this.
I also tried by applying a keyBy() to the result stream in which I assigned the 
same hash to any event, but the result remains the same.
result.map((//extract some of the Tuple fields).keyBy(
new KeySelector<Tuple2<Long,Long>, Integer>() {
@Override
public Integer getKey(Tuple2<Long, Long> arg0) throws 
Exception {

return 1;
}
@Override
public int hashCode() {

return 1;
}

}). timeWindowAll().apply()


Thanks for the help/ideas





Re: global function over partitions

2016-01-15 Thread Aljoscha Krettek
, Integer>, Tuple, TimeWindow>() {
>   public void apply(
> Tuple arg0,
> TimeWindow arg1,
> 
> java.lang.Iterable<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> 
> arg2,
> 
> org.apache.flink.util.Collector<org.apache.flink.api.java.tuple.Tuple2<Integer,
>  Integer>> arg3)
> throws Exception {
>  
>  // Compuatation 
>  int count = 0;
>  for (Tuple2<Integer, String> value : 
> arg2) {
> count++;
> arg3.collect(new 
> Tuple2<Integer, Integer>(value.f0,
>   
> value.f1.length()));
>  }
>  //System.out.println("Count per hash 
> is " + count);
>   };
>  
>});
>  
>   result.timeWindowAll(Time.of(2, TimeUnit.SECONDS))
>.apply(new AllWindowFunction<Tuple2<Integer, 
> Integer>, Tuple1, TimeWindow>() {
>   @Override
>   public void apply(TimeWindow arg0,
> Iterable<Tuple2<Integer, 
> Integer>> arg1,
> Collector<Tuple1> 
> arg2) throws Exception {
>  
>  // Compuatation 
>  int count = 0;
>  for (Tuple2<Integer, Integer> value 
> : arg1) {
> count++;
>  }
>  //System.out.println("Count 
> aggregated metrics is "
>  //+ count + " at " + 
> System.currentTimeMillis());
>  arg2.collect(new Tuple1(count));
>  
>   }
>}).setParallelism(1)
>.writeAsText("/tmp/testoutput", 
> WriteMode.OVERWRITE);
>  
>   env.execute("main stream application");
>  
>}
>  
>  
>  
> Regards,
>  
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R Division
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Robert Metzger [mailto:rmetz...@apache.org] 
> Sent: Friday, January 15, 2016 10:18 AM
> To: user@flink.apache.org
> Subject: Re: global function over partitions
>  
> Hi Radu,
>  
> I'm sorry for the delayed response.
> I'm not sure what the purpose of DataStream.global() actually is. I've opened 
> a JIRA to document or remove it: 
> https://issues.apache.org/jira/browse/FLINK-3240.
>  
> For getting the final metrics, you can just call ".timeWindowAll()", without 
> a ".global()" call before. The timeWindowAll() will run with a parallelism of 
> one, hence it will receive the data from all partitions.
>  
> Regards,
> Robert
>  
>  
>  
>  
>  
> On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> Hi,
> 
> I am trying to comp

Re: global function over partitions

2016-01-15 Thread Robert Metzger
Hi Radu,

I'm sorry for the delayed response.
I'm not sure what the purpose of DataStream.global() actually is. I've
opened a JIRA to document or remove it:
https://issues.apache.org/jira/browse/FLINK-3240.

For getting the final metrics, you can just call ".timeWindowAll()",
without a ".global()" call before. The timeWindowAll() will run with a
parallelism of one, hence it will receive the data from all partitions.

Regards,
Robert





On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran 
wrote:

> Hi,
>
> I am trying to compute some final statistics over a stream topology. For
> this I would like to gather all data from all windows and parallel
> partitions into a single/global window. Could you suggest a solution for
> this. I saw that the map function has a ".global()" but I end up with the
> same number of partitions as I have in the main computation. Bellow you can
> find a schema for the program:
>
>
> DataStream stream = env.Read...
>
> end.setParallelism(10);
> //Compute phase
> DataStream result = stream.keyBy(_).window(_).apply();
> //end compute phase
>
>
> //get the metrics
> result.map(//extract some of the Tuple
> fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1,
> TimeUnit.SECONDS))
> .trigger(EventTimeTrigger.create()).apply ().writeAsText();
>
>
> For this last function - I would expect that even if I had parallel
> computation during the compute phase, I can select part of the events from
> all partitions and gather all these into one unique window. However, I do
> not seem to be successful in this.
> I also tried by applying a keyBy() to the result stream in which I
> assigned the same hash to any event, but the result remains the same.
> result.map((//extract some of the Tuple fields).keyBy(
> new KeySelector, Integer>() {
> @Override
> public Integer getKey(Tuple2 arg0)
> throws Exception {
>
> return 1;
> }
> @Override
> public int hashCode() {
>
> return 1;
> }
>
> }). timeWindowAll().apply()
>
>
> Thanks for the help/ideas
>
>
>
>


global function over partitions

2016-01-12 Thread Radu Tudoran
Hi,

I am trying to compute some final statistics over a stream topology. For this I 
would like to gather all data from all windows and parallel partitions into a 
single/global window. Could you suggest a solution for this. I saw that the map 
function has a ".global()" but I end up with the same number of partitions as I 
have in the main computation. Bellow you can find a schema for the program:


DataStream stream = env.Read...

end.setParallelism(10);
//Compute phase
DataStream result = stream.keyBy(_).window(_).apply();
//end compute phase


//get the metrics
result.map(//extract some of the Tuple 
fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, 
TimeUnit.SECONDS))
.trigger(EventTimeTrigger.create()).apply ().writeAsText();


For this last function - I would expect that even if I had parallel computation 
during the compute phase, I can select part of the events from all partitions 
and gather all these into one unique window. However, I do not seem to be 
successful in this.
I also tried by applying a keyBy() to the result stream in which I assigned the 
same hash to any event, but the result remains the same.
result.map((//extract some of the Tuple fields).keyBy(
new KeySelector, Integer>() {
@Override
public Integer getKey(Tuple2 arg0) throws 
Exception {

return 1;
}
@Override
public int hashCode() {

return 1;
}

}). timeWindowAll().apply()


Thanks for the help/ideas