[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399929 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; /** * Constructs a PartitionDoFn. * * @throws IllegalArgumentException if {@code numPartitions <= 0} */ -public PartitionDoFn(int numPartitions, PartitionFn partitionFn) { +public PartitionDoFn( +int numPartitions, +Contextful> ctxFn, +Object originalFnForDisplayData) { Review comment: Done thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399738 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; Review comment: Done. thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399634 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -85,7 +141,14 @@ * @throws IllegalArgumentException if {@code numPartitions <= 0} */ public static Partition of(int numPartitions, PartitionFn partitionFn) { -return new Partition<>(new PartitionDoFn(numPartitions, partitionFn)); + +Contextful ctfFn = +Contextful.fn( +(T element, Contextful.Fn.Context c) -> +partitionFn.partitionFor(element, numPartitions), +Requirements.empty()); +Object aClass = partitionFn; Review comment: Hi, I don't get your suggestion here. I will need to wrap interface function in ContextFul.Fn. Can you eloborate please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399671 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -76,6 +93,45 @@ int partitionFor(T elem, int numPartitions); } + /** + * A function object that chooses an output partition for an element. + * + * @param the type of the elements being partitioned + */ + public interface PartitionWithSideInputsFn extends Serializable { +/** + * Chooses the partition into which to put the given element. + * + * @param elem the element to be partitioned + * @param numPartitions the total number of partitions ({@code >= 1}) + * @param c the {@link Contextful.Fn.Context} needed to access sideInputs. + * @return index of the selected partition (in the range {@code [0..numPartitions-1]}) + */ +int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c); + } + + /** + * Returns a new {@code Partition} {@code PTransform} that divides its input {@code PCollection} + * into the given number of partitions, using the given partitioning function. + * + * @param numPartitions the number of partitions to divide the input {@code PCollection} into + * @param partitionFn the function to invoke on each element to choose its output partition + * @param requirements the {@link Requirements} needed to run it. + * @throws IllegalArgumentException if {@code numPartitions <= 0} + */ + public static Partition of( + int numPartitions, + PartitionWithSideInputsFn partitionFn, + Requirements requirements) { +Contextful ctfFn = +Contextful.fn( +(T element, Contextful.Fn.Context c) -> +partitionFn.partitionFor(element, numPartitions, c), +requirements); +Object aClass = partitionFn; Review comment: Hi, I don't get your suggestion here. I will need to wrap interface function in ContextFul.Fn. Can you eloborate please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399770 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; /** * Constructs a PartitionDoFn. * * @throws IllegalArgumentException if {@code numPartitions <= 0} */ -public PartitionDoFn(int numPartitions, PartitionFn partitionFn) { +public PartitionDoFn( Review comment: Done.Thanks ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; Review comment: Done Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org