Re: Semantic Properties and Functions with Iterables
Any other thoughts in this? On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen se...@apache.org wrote: I think the order of emitting elements is not part of the forward field properties, but would rather be a separate one that we do not have right now. At the moment, we would assume that all group operations destroy secondary orders. In that sense, forward fields in group operations only make sense for fields where all fields are the same in the group (key fields). On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske fhue...@gmail.com wrote: Hi Timo, there are several restrictions for forwarded fields of operators with iterator input. 1) forwarded fields must be emitted in the order in which they are received through the iterator 2) all forwarded fields of a record must stick together, i.e., if your function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1 of the 2nd, 4th, ... record coming through the iterator, these are not valid forwarded fields. 3) it is OK to completely filter out records coming through the iterator. The reason for these rules is, that the optimizer uses forwarded fields to reason about physical data properties such as order and grouping. If you mix up the order of records or emit records which are composed from different input records, you might destroy a (secondary) order or grouping. Considering these rules, your second example is correct as well. In case of the TriadBuilder, the information is correct (in the context of the Program) as well, because field 0 is used as key. It is however true, that there is a strange dependency between the function and the context in which it is used within the program. It would be better to remove the class annotation, and add this information through the .withForwardedFields(0) method in the program, to make that clear. It is very good that you raise this point. This is currently not reflected in the documentation is should be made clear very soon. I will open a JIRA for that. Thanks, Fabian 2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org: Hey all, I'm currently working a lot on the UDF static code analyzer. But I have a general question about Semantic Properties which might be also interesting for other users. How is the ForwardedFields annotation interpreted for UDF functions with Iterables? An example can be found in: org.apache.flink.examples. java.graph.EnumTrianglesBasic.TriadBuilder Does this mean that each call of collect must happen in the same order than the call of next? But this is not the case in the example above. Or does the annotation only refer to the first iterator element? Other examples: @ForwardedFields(*) // CORRECT? public static class GroupReduce1 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { out.collect(values.iterator().next()); } } @ForwardedFields(*) // NOT CORRECT? public static class GroupReduce3 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { IteratorTuple2Long, Long it = values.iterator(); while (it.hasNext()) { Tuple2Long,Long t = it.next(); if (t.f0 == 42) { out.collect(t); } } } } Thanks in advance. Regards, Timo
Re: Semantic Properties and Functions with Iterables
I added you comment and an answer to FLINK-1656: Right, that's a good point. +1 limiting to key fields. That's much easier to reason about for users. However, I am not sure how it is implemented right now. I guess secondary sort info is already removed by the property filtering, but I need to verify that. 2015-03-08 21:53 GMT+01:00 Stephan Ewen se...@apache.org: Any other thoughts in this? On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen se...@apache.org wrote: I think the order of emitting elements is not part of the forward field properties, but would rather be a separate one that we do not have right now. At the moment, we would assume that all group operations destroy secondary orders. In that sense, forward fields in group operations only make sense for fields where all fields are the same in the group (key fields). On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske fhue...@gmail.com wrote: Hi Timo, there are several restrictions for forwarded fields of operators with iterator input. 1) forwarded fields must be emitted in the order in which they are received through the iterator 2) all forwarded fields of a record must stick together, i.e., if your function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1 of the 2nd, 4th, ... record coming through the iterator, these are not valid forwarded fields. 3) it is OK to completely filter out records coming through the iterator. The reason for these rules is, that the optimizer uses forwarded fields to reason about physical data properties such as order and grouping. If you mix up the order of records or emit records which are composed from different input records, you might destroy a (secondary) order or grouping. Considering these rules, your second example is correct as well. In case of the TriadBuilder, the information is correct (in the context of the Program) as well, because field 0 is used as key. It is however true, that there is a strange dependency between the function and the context in which it is used within the program. It would be better to remove the class annotation, and add this information through the .withForwardedFields(0) method in the program, to make that clear. It is very good that you raise this point. This is currently not reflected in the documentation is should be made clear very soon. I will open a JIRA for that. Thanks, Fabian 2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org: Hey all, I'm currently working a lot on the UDF static code analyzer. But I have a general question about Semantic Properties which might be also interesting for other users. How is the ForwardedFields annotation interpreted for UDF functions with Iterables? An example can be found in: org.apache.flink.examples. java.graph.EnumTrianglesBasic.TriadBuilder Does this mean that each call of collect must happen in the same order than the call of next? But this is not the case in the example above. Or does the annotation only refer to the first iterator element? Other examples: @ForwardedFields(*) // CORRECT? public static class GroupReduce1 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { out.collect(values.iterator().next()); } } @ForwardedFields(*) // NOT CORRECT? public static class GroupReduce3 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { IteratorTuple2Long, Long it = values.iterator(); while (it.hasNext()) { Tuple2Long,Long t = it.next(); if (t.f0 == 42) { out.collect(t); } } } } Thanks in advance. Regards, Timo
Re: Semantic Properties and Functions with Iterables
I think the order of emitting elements is not part of the forward field properties, but would rather be a separate one that we do not have right now. At the moment, we would assume that all group operations destroy secondary orders. In that sense, forward fields in group operations only make sense for fields where all fields are the same in the group (key fields). On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske fhue...@gmail.com wrote: Hi Timo, there are several restrictions for forwarded fields of operators with iterator input. 1) forwarded fields must be emitted in the order in which they are received through the iterator 2) all forwarded fields of a record must stick together, i.e., if your function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1 of the 2nd, 4th, ... record coming through the iterator, these are not valid forwarded fields. 3) it is OK to completely filter out records coming through the iterator. The reason for these rules is, that the optimizer uses forwarded fields to reason about physical data properties such as order and grouping. If you mix up the order of records or emit records which are composed from different input records, you might destroy a (secondary) order or grouping. Considering these rules, your second example is correct as well. In case of the TriadBuilder, the information is correct (in the context of the Program) as well, because field 0 is used as key. It is however true, that there is a strange dependency between the function and the context in which it is used within the program. It would be better to remove the class annotation, and add this information through the .withForwardedFields(0) method in the program, to make that clear. It is very good that you raise this point. This is currently not reflected in the documentation is should be made clear very soon. I will open a JIRA for that. Thanks, Fabian 2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org: Hey all, I'm currently working a lot on the UDF static code analyzer. But I have a general question about Semantic Properties which might be also interesting for other users. How is the ForwardedFields annotation interpreted for UDF functions with Iterables? An example can be found in: org.apache.flink.examples. java.graph.EnumTrianglesBasic.TriadBuilder Does this mean that each call of collect must happen in the same order than the call of next? But this is not the case in the example above. Or does the annotation only refer to the first iterator element? Other examples: @ForwardedFields(*) // CORRECT? public static class GroupReduce1 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { out.collect(values.iterator().next()); } } @ForwardedFields(*) // NOT CORRECT? public static class GroupReduce3 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { IteratorTuple2Long, Long it = values.iterator(); while (it.hasNext()) { Tuple2Long,Long t = it.next(); if (t.f0 == 42) { out.collect(t); } } } } Thanks in advance. Regards, Timo
Re: Semantic Properties and Functions with Iterables
Hi Timo, there are several restrictions for forwarded fields of operators with iterator input. 1) forwarded fields must be emitted in the order in which they are received through the iterator 2) all forwarded fields of a record must stick together, i.e., if your function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1 of the 2nd, 4th, ... record coming through the iterator, these are not valid forwarded fields. 3) it is OK to completely filter out records coming through the iterator. The reason for these rules is, that the optimizer uses forwarded fields to reason about physical data properties such as order and grouping. If you mix up the order of records or emit records which are composed from different input records, you might destroy a (secondary) order or grouping. Considering these rules, your second example is correct as well. In case of the TriadBuilder, the information is correct (in the context of the Program) as well, because field 0 is used as key. It is however true, that there is a strange dependency between the function and the context in which it is used within the program. It would be better to remove the class annotation, and add this information through the .withForwardedFields(0) method in the program, to make that clear. It is very good that you raise this point. This is currently not reflected in the documentation is should be made clear very soon. I will open a JIRA for that. Thanks, Fabian 2015-03-06 10:19 GMT+01:00 Timo Walther twal...@apache.org: Hey all, I'm currently working a lot on the UDF static code analyzer. But I have a general question about Semantic Properties which might be also interesting for other users. How is the ForwardedFields annotation interpreted for UDF functions with Iterables? An example can be found in: org.apache.flink.examples. java.graph.EnumTrianglesBasic.TriadBuilder Does this mean that each call of collect must happen in the same order than the call of next? But this is not the case in the example above. Or does the annotation only refer to the first iterator element? Other examples: @ForwardedFields(*) // CORRECT? public static class GroupReduce1 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { out.collect(values.iterator().next()); } } @ForwardedFields(*) // NOT CORRECT? public static class GroupReduce3 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { IteratorTuple2Long, Long it = values.iterator(); while (it.hasNext()) { Tuple2Long,Long t = it.next(); if (t.f0 == 42) { out.collect(t); } } } } Thanks in advance. Regards, Timo
Semantic Properties and Functions with Iterables
Hey all, I'm currently working a lot on the UDF static code analyzer. But I have a general question about Semantic Properties which might be also interesting for other users. How is the ForwardedFields annotation interpreted for UDF functions with Iterables? An example can be found in: org.apache.flink.examples.java.graph.EnumTrianglesBasic.TriadBuilder Does this mean that each call of collect must happen in the same order than the call of next? But this is not the case in the example above. Or does the annotation only refer to the first iterator element? Other examples: @ForwardedFields(*) // CORRECT? public static class GroupReduce1 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { out.collect(values.iterator().next()); } } @ForwardedFields(*) // NOT CORRECT? public static class GroupReduce3 implements GroupReduceFunctionTuple2Long, Long,Tuple2Long, Long { @Override public void reduce(IterableTuple2Long, Long values, CollectorTuple2Long, Long out) throws Exception { IteratorTuple2Long, Long it = values.iterator(); while (it.hasNext()) { Tuple2Long,Long t = it.next(); if (t.f0 == 42) { out.collect(t); } } } } Thanks in advance. Regards, Timo