Re: Semantic Properties and Functions with Iterables

2015-03-08 Thread Stephan Ewen
Any other thoughts in this?

On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen se...@apache.org wrote:

 I think the order of emitting elements is not part of the forward field
 properties, but would rather be a separate one that we do not have right
 now.

 At the moment, we would assume that all group operations destroy secondary
 orders.

 In that sense, forward fields in group operations only make sense for
 fields where all fields are the same in the group (key fields).

 On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske fhue...@gmail.com wrote:

 Hi Timo,

 there are several restrictions for forwarded fields of operators with
 iterator input.
 1) forwarded fields must be emitted in the order in which they are
 received
 through the iterator
 2) all forwarded fields of a record must stick together, i.e., if your
 function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
 of the 2nd, 4th, ... record coming through the iterator, these are not
 valid forwarded fields.
 3) it is OK to completely filter out records coming through the iterator.

 The reason for these rules is, that the optimizer uses forwarded fields to
 reason about physical data properties such as order and grouping. If you
 mix up the order of records or emit records which are composed from
 different input records, you might destroy a (secondary) order or
 grouping.

 Considering these rules, your second example is correct as well.
 In case of the TriadBuilder, the information is correct (in the context of
 the Program) as well, because field 0 is used as key. It is however true,
 that there is a strange dependency between the function and the context in
 which it is used within the program. It would be better to remove the
 class
 annotation, and add this information through the .withForwardedFields(0)
 method in the program, to make that clear.

 It is very good that you raise this point.
 This is currently not reflected in the documentation is should be made
 clear very soon. I will open a JIRA for that.

 Thanks, Fabian



 2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org:

  Hey all,
 
  I'm currently working a lot on the UDF static code analyzer. But I have
 a
  general question about Semantic Properties which might be also
 interesting
  for other users.
 
  How is the ForwardedFields annotation interpreted for UDF functions with
  Iterables?
 
  An example can be found in: org.apache.flink.examples.
  java.graph.EnumTrianglesBasic.TriadBuilder
 
  Does this mean that each call of collect must happen in the same order
  than the call of next? But this is not the case in the example above.
 Or
  does the annotation only refer to the first iterator element?
 
  Other examples:
 
  @ForwardedFields(*) // CORRECT?
  public static class GroupReduce1 implements
 GroupReduceFunctionTuple2Long,
  Long,Tuple2Long, Long {
  @Override
  public void reduce(IterableTuple2Long, Long values,
  CollectorTuple2Long, Long out) throws Exception {
  out.collect(values.iterator().next());
  }
  }
 
  @ForwardedFields(*) // NOT CORRECT?
  public static class GroupReduce3 implements
 GroupReduceFunctionTuple2Long,
  Long,Tuple2Long, Long {
  @Override
  public void reduce(IterableTuple2Long, Long values,
  CollectorTuple2Long, Long out) throws Exception {
  IteratorTuple2Long, Long it = values.iterator();
  while (it.hasNext()) {
  Tuple2Long,Long t = it.next();
  if (t.f0 == 42) {
  out.collect(t);
  }
  }
  }
  }
 
  Thanks in advance.
 
  Regards,
  Timo
 





Re: Semantic Properties and Functions with Iterables

2015-03-08 Thread Fabian Hueske
I added you comment and an answer to FLINK-1656:

Right, that's a good point.

+1 limiting to key fields. That's much easier to reason about for users.

However, I am not sure how it is implemented right now.
I guess secondary sort info is already removed by the property filtering,
but I need to verify that.

2015-03-08 21:53 GMT+01:00 Stephan Ewen se...@apache.org:

 Any other thoughts in this?

 On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen se...@apache.org wrote:

  I think the order of emitting elements is not part of the forward field
  properties, but would rather be a separate one that we do not have right
  now.
 
  At the moment, we would assume that all group operations destroy
 secondary
  orders.
 
  In that sense, forward fields in group operations only make sense for
  fields where all fields are the same in the group (key fields).
 
  On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske fhue...@gmail.com
 wrote:
 
  Hi Timo,
 
  there are several restrictions for forwarded fields of operators with
  iterator input.
  1) forwarded fields must be emitted in the order in which they are
  received
  through the iterator
  2) all forwarded fields of a record must stick together, i.e., if your
  function builds record from field 0 of the 1st, 3rd, 5th, ... and field
 1
  of the 2nd, 4th, ... record coming through the iterator, these are not
  valid forwarded fields.
  3) it is OK to completely filter out records coming through the
 iterator.
 
  The reason for these rules is, that the optimizer uses forwarded fields
 to
  reason about physical data properties such as order and grouping. If you
  mix up the order of records or emit records which are composed from
  different input records, you might destroy a (secondary) order or
  grouping.
 
  Considering these rules, your second example is correct as well.
  In case of the TriadBuilder, the information is correct (in the context
 of
  the Program) as well, because field 0 is used as key. It is however
 true,
  that there is a strange dependency between the function and the context
 in
  which it is used within the program. It would be better to remove the
  class
  annotation, and add this information through the
 .withForwardedFields(0)
  method in the program, to make that clear.
 
  It is very good that you raise this point.
  This is currently not reflected in the documentation is should be made
  clear very soon. I will open a JIRA for that.
 
  Thanks, Fabian
 
 
 
  2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org:
 
   Hey all,
  
   I'm currently working a lot on the UDF static code analyzer. But I
 have
  a
   general question about Semantic Properties which might be also
  interesting
   for other users.
  
   How is the ForwardedFields annotation interpreted for UDF functions
 with
   Iterables?
  
   An example can be found in: org.apache.flink.examples.
   java.graph.EnumTrianglesBasic.TriadBuilder
  
   Does this mean that each call of collect must happen in the same
 order
   than the call of next? But this is not the case in the example
 above.
  Or
   does the annotation only refer to the first iterator element?
  
   Other examples:
  
   @ForwardedFields(*) // CORRECT?
   public static class GroupReduce1 implements
  GroupReduceFunctionTuple2Long,
   Long,Tuple2Long, Long {
   @Override
   public void reduce(IterableTuple2Long, Long values,
   CollectorTuple2Long, Long out) throws Exception {
   out.collect(values.iterator().next());
   }
   }
  
   @ForwardedFields(*) // NOT CORRECT?
   public static class GroupReduce3 implements
  GroupReduceFunctionTuple2Long,
   Long,Tuple2Long, Long {
   @Override
   public void reduce(IterableTuple2Long, Long values,
   CollectorTuple2Long, Long out) throws Exception {
   IteratorTuple2Long, Long it = values.iterator();
   while (it.hasNext()) {
   Tuple2Long,Long t = it.next();
   if (t.f0 == 42) {
   out.collect(t);
   }
   }
   }
   }
  
   Thanks in advance.
  
   Regards,
   Timo
  
 
 
 



Re: Semantic Properties and Functions with Iterables

2015-03-06 Thread Stephan Ewen
I think the order of emitting elements is not part of the forward field
properties, but would rather be a separate one that we do not have right
now.

At the moment, we would assume that all group operations destroy secondary
orders.

In that sense, forward fields in group operations only make sense for
fields where all fields are the same in the group (key fields).

On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske fhue...@gmail.com wrote:

 Hi Timo,

 there are several restrictions for forwarded fields of operators with
 iterator input.
 1) forwarded fields must be emitted in the order in which they are received
 through the iterator
 2) all forwarded fields of a record must stick together, i.e., if your
 function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
 of the 2nd, 4th, ... record coming through the iterator, these are not
 valid forwarded fields.
 3) it is OK to completely filter out records coming through the iterator.

 The reason for these rules is, that the optimizer uses forwarded fields to
 reason about physical data properties such as order and grouping. If you
 mix up the order of records or emit records which are composed from
 different input records, you might destroy a (secondary) order or grouping.

 Considering these rules, your second example is correct as well.
 In case of the TriadBuilder, the information is correct (in the context of
 the Program) as well, because field 0 is used as key. It is however true,
 that there is a strange dependency between the function and the context in
 which it is used within the program. It would be better to remove the class
 annotation, and add this information through the .withForwardedFields(0)
 method in the program, to make that clear.

 It is very good that you raise this point.
 This is currently not reflected in the documentation is should be made
 clear very soon. I will open a JIRA for that.

 Thanks, Fabian



 2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org:

  Hey all,
 
  I'm currently working a lot on the UDF static code analyzer. But I have a
  general question about Semantic Properties which might be also
 interesting
  for other users.
 
  How is the ForwardedFields annotation interpreted for UDF functions with
  Iterables?
 
  An example can be found in: org.apache.flink.examples.
  java.graph.EnumTrianglesBasic.TriadBuilder
 
  Does this mean that each call of collect must happen in the same order
  than the call of next? But this is not the case in the example above.
 Or
  does the annotation only refer to the first iterator element?
 
  Other examples:
 
  @ForwardedFields(*) // CORRECT?
  public static class GroupReduce1 implements
 GroupReduceFunctionTuple2Long,
  Long,Tuple2Long, Long {
  @Override
  public void reduce(IterableTuple2Long, Long values,
  CollectorTuple2Long, Long out) throws Exception {
  out.collect(values.iterator().next());
  }
  }
 
  @ForwardedFields(*) // NOT CORRECT?
  public static class GroupReduce3 implements
 GroupReduceFunctionTuple2Long,
  Long,Tuple2Long, Long {
  @Override
  public void reduce(IterableTuple2Long, Long values,
  CollectorTuple2Long, Long out) throws Exception {
  IteratorTuple2Long, Long it = values.iterator();
  while (it.hasNext()) {
  Tuple2Long,Long t = it.next();
  if (t.f0 == 42) {
  out.collect(t);
  }
  }
  }
  }
 
  Thanks in advance.
 
  Regards,
  Timo
 



Re: Semantic Properties and Functions with Iterables

2015-03-06 Thread Fabian Hueske
Hi Timo,

there are several restrictions for forwarded fields of operators with
iterator input.
1) forwarded fields must be emitted in the order in which they are received
through the iterator
2) all forwarded fields of a record must stick together, i.e., if your
function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
of the 2nd, 4th, ... record coming through the iterator, these are not
valid forwarded fields.
3) it is OK to completely filter out records coming through the iterator.

The reason for these rules is, that the optimizer uses forwarded fields to
reason about physical data properties such as order and grouping. If you
mix up the order of records or emit records which are composed from
different input records, you might destroy a (secondary) order or grouping.

Considering these rules, your second example is correct as well.
In case of the TriadBuilder, the information is correct (in the context of
the Program) as well, because field 0 is used as key. It is however true,
that there is a strange dependency between the function and the context in
which it is used within the program. It would be better to remove the class
annotation, and add this information through the .withForwardedFields(0)
method in the program, to make that clear.

It is very good that you raise this point.
This is currently not reflected in the documentation is should be made
clear very soon. I will open a JIRA for that.

Thanks, Fabian



2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org:

 Hey all,

 I'm currently working a lot on the UDF static code analyzer. But I have a
 general question about Semantic Properties which might be also interesting
 for other users.

 How is the ForwardedFields annotation interpreted for UDF functions with
 Iterables?

 An example can be found in: org.apache.flink.examples.
 java.graph.EnumTrianglesBasic.TriadBuilder

 Does this mean that each call of collect must happen in the same order
 than the call of next? But this is not the case in the example above. Or
 does the annotation only refer to the first iterator element?

 Other examples:

 @ForwardedFields(*) // CORRECT?
 public static class GroupReduce1 implements 
 GroupReduceFunctionTuple2Long,
 Long,Tuple2Long, Long {
 @Override
 public void reduce(IterableTuple2Long, Long values,
 CollectorTuple2Long, Long out) throws Exception {
 out.collect(values.iterator().next());
 }
 }

 @ForwardedFields(*) // NOT CORRECT?
 public static class GroupReduce3 implements 
 GroupReduceFunctionTuple2Long,
 Long,Tuple2Long, Long {
 @Override
 public void reduce(IterableTuple2Long, Long values,
 CollectorTuple2Long, Long out) throws Exception {
 IteratorTuple2Long, Long it = values.iterator();
 while (it.hasNext()) {
 Tuple2Long,Long t = it.next();
 if (t.f0 == 42) {
 out.collect(t);
 }
 }
 }
 }

 Thanks in advance.

 Regards,
 Timo



Semantic Properties and Functions with Iterables

2015-03-06 Thread Timo Walther

Hey all,

I'm currently working a lot on the UDF static code analyzer. But I have 
a general question about Semantic Properties which might be also 
interesting for other users.


How is the ForwardedFields annotation interpreted for UDF functions with 
Iterables?


An example can be found in: 
org.apache.flink.examples.java.graph.EnumTrianglesBasic.TriadBuilder


Does this mean that each call of collect must happen in the same order 
than the call of next? But this is not the case in the example above. 
Or does the annotation only refer to the first iterator element?


Other examples:

@ForwardedFields(*) // CORRECT?
public static class GroupReduce1 implements 
GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long {

@Override
public void reduce(IterableTuple2Long, Long values,
CollectorTuple2Long, Long out) throws Exception {
out.collect(values.iterator().next());
}
}

@ForwardedFields(*) // NOT CORRECT?
public static class GroupReduce3 implements 
GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long {

@Override
public void reduce(IterableTuple2Long, Long values,
CollectorTuple2Long, Long out) throws Exception {
IteratorTuple2Long, Long it = values.iterator();
while (it.hasNext()) {
Tuple2Long,Long t = it.next();
if (t.f0 == 42) {
out.collect(t);
}
}
}
}

Thanks in advance.

Regards,
Timo