[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399929



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;
 
 /**
  * Constructs a PartitionDoFn.
  *
  * @throws IllegalArgumentException if {@code numPartitions <= 0}
  */
-public PartitionDoFn(int numPartitions, PartitionFn 
partitionFn) {
+public PartitionDoFn(
+int numPartitions,
+Contextful> ctxFn,
+Object originalFnForDisplayData) {

Review comment:
   Done thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399738



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;

Review comment:
   Done. thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399634



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -85,7 +141,14 @@
* @throws IllegalArgumentException if {@code numPartitions <= 0}
*/
   public static  Partition of(int numPartitions, PartitionFn 
partitionFn) {
-return new Partition<>(new PartitionDoFn(numPartitions, partitionFn));
+
+Contextful ctfFn =
+Contextful.fn(
+(T element, Contextful.Fn.Context c) ->
+partitionFn.partitionFor(element, numPartitions),
+Requirements.empty());
+Object aClass = partitionFn;

Review comment:
   Hi, I don't get your suggestion here. I will need to wrap interface 
function in ContextFul.Fn. Can you eloborate please?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399671



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -76,6 +93,45 @@
 int partitionFor(T elem, int numPartitions);
   }
 
+  /**
+   * A function object that chooses an output partition for an element.
+   *
+   * @param  the type of the elements being partitioned
+   */
+  public interface PartitionWithSideInputsFn extends Serializable {
+/**
+ * Chooses the partition into which to put the given element.
+ *
+ * @param elem the element to be partitioned
+ * @param numPartitions the total number of partitions ({@code >= 1})
+ * @param c the {@link Contextful.Fn.Context} needed to access sideInputs.
+ * @return index of the selected partition (in the range {@code 
[0..numPartitions-1]})
+ */
+int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c);
+  }
+
+  /**
+   * Returns a new {@code Partition} {@code PTransform} that divides its input 
{@code PCollection}
+   * into the given number of partitions, using the given partitioning 
function.
+   *
+   * @param numPartitions the number of partitions to divide the input {@code 
PCollection} into
+   * @param partitionFn the function to invoke on each element to choose its 
output partition
+   * @param requirements the {@link Requirements} needed to run it.
+   * @throws IllegalArgumentException if {@code numPartitions <= 0}
+   */
+  public static  Partition of(
+  int numPartitions,
+  PartitionWithSideInputsFn partitionFn,
+  Requirements requirements) {
+Contextful ctfFn =
+Contextful.fn(
+(T element, Contextful.Fn.Context c) ->
+partitionFn.partitionFor(element, numPartitions, c),
+requirements);
+Object aClass = partitionFn;

Review comment:
   Hi, I don't get your suggestion here. I will need to wrap interface 
function in ContextFul.Fn. Can you eloborate please?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399770



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;
 
 /**
  * Constructs a PartitionDoFn.
  *
  * @throws IllegalArgumentException if {@code numPartitions <= 0}
  */
-public PartitionDoFn(int numPartitions, PartitionFn 
partitionFn) {
+public PartitionDoFn(

Review comment:
   Done.Thanks

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;

Review comment:
   Done Thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org