[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=152156=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152156
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 08/Oct/18 08:12
Start Date: 08/Oct/18 08:12
Worklog Time Spent: 10m 
  Work Description: reuvenlax closed pull request #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 865b2488608..b9e86649124 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -322,8 +322,7 @@ public void registerCoderForType(TypeDescriptor type, 
Coder coder) {
 if (paramCoderOrNull != null) {
   return paramCoderOrNull;
 } else {
-  throw new CannotProvideCoderException(
-  "Cannot infer coder for type parameter " + param.getName());
+  throw new CannotProvideCoderException("Cannot infer coder for type 
parameter " + param);
 }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 7e677a51531..bcb87adfcee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -29,6 +29,7 @@
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -116,7 +117,35 @@ public Schema getSchema() {
 
   @Override
   public void verifyDeterministic()
-  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {}
+  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+verifyDeterministic(schema);
+  }
+
+  private void verifyDeterministic(Schema schema)
+  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+for (Field field : schema.getFields()) {
+  verifyDeterministic(field.getType());
+}
+  }
+
+  private void verifyDeterministic(FieldType fieldType)
+  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+switch (fieldType.getTypeName()) {
+  case MAP:
+throw new NonDeterministicException(
+this,
+"Map-valued fields cannot be used in keys as Beam requires 
deterministic encoding for"
++ " keys.");
+  case ROW:
+verifyDeterministic(fieldType.getRowSchema());
+break;
+  case ARRAY:
+verifyDeterministic(fieldType.getCollectionElementType());
+break;
+  default:
+break;
+}
+  }
 
   @Override
   public boolean consistentWithEquals() {
@@ -124,8 +153,20 @@ public boolean consistentWithEquals() {
   }
 
   /** Returns the coder used for a given primitive type. */
-  public static  Coder coderForPrimitiveType(TypeName typeName) {
-return (Coder) CODER_MAP.get(typeName);
+  public static  Coder coderForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ROW:
+return (Coder) RowCoder.of(fieldType.getRowSchema());
+  case ARRAY:
+return (Coder) 
ListCoder.of(coderForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return (Coder)
+MapCoder.of(
+coderForFieldType(fieldType.getMapKeyType()),
+coderForFieldType(fieldType.getMapValueType()));
+  default:
+return (Coder) CODER_MAP.get(fieldType.getTypeName());
+}
   }
 
   /** Return the estimated serialized size of a give row object. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
index 252ea5e1831..39eddb96ba5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
@@ -102,7 +102,7 @@ private static FieldType getArrayFieldType(TypeDescriptor 
typeDescriptor) {
 return 
FieldType.array(fieldTypeForJavaType(TypeDescriptor.of(params[0])));
   }
 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=152142=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152142
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 08/Oct/18 06:06
Start Date: 08/Oct/18 06:06
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r223254160
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
 
 Review comment:
   This is not a DataFrames API here. I think we should have one, but it might 
make more sense as a layer on top of Calcite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152142)
Time Spent: 12h 50m  (was: 12h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 12h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=152101=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152101
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 07/Oct/18 20:34
Start Date: 07/Oct/18 20:34
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r223226864
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineFns;
+import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
+import org.apache.beam.sdk.transforms.CombineFns.ComposedCombineFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+
+@Experimental(Kind.SCHEMAS)
+class SchemaAggregateFn {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152101)
Time Spent: 12h 40m  (was: 12.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 12h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=151692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151692
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Oct/18 16:59
Start Date: 05/Oct/18 16:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r223074059
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineFns;
+import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
+import org.apache.beam.sdk.transforms.CombineFns.ComposedCombineFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+
+@Experimental(Kind.SCHEMAS)
+class SchemaAggregateFn {
 
 Review comment:
   This is non-trivial, so I think this needs some minimal javadoc even if it 
is package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 151692)
Time Spent: 12h 20m  (was: 12h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=151694=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151694
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Oct/18 16:59
Start Date: 05/Oct/18 16:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r223075185
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##
 @@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.DefaultSchema;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Test for {@link Group}. */
+public class GroupTest implements Serializable {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  /** A simple POJO for testing. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO implements Serializable {
+public String field1;
+public long field2;
+public String field3;
+
+public POJO(String field1, long field2, String field3) {
+  this.field1 = field1;
+  this.field2 = field2;
+  this.field3 = field3;
+}
+
+public POJO() {}
+
+@Override
+public boolean equals(Object o) {
+  if (this == o) {
+return true;
+  }
+  if (o == null || getClass() != o.getClass()) {
+return false;
+  }
+  POJO pojo = (POJO) o;
+  return field2 == pojo.field2
+  && Objects.equals(field1, pojo.field1)
+  && Objects.equals(field3, pojo.field3);
+}
+
+@Override
+public int hashCode() {
+  return Objects.hash(field1, field2, field3);
+}
+
+@Override
+public String toString() {
+  return "POJO{"
+  + "field1='"
+  + field1
+  + '\''
+  + ", field2="
+  + field2
+  + ", field3='"
+  + field3
+  + '\''
+  + '}';
+}
+  }
+
+  private static final Schema POJO_SCHEMA =
+  Schema.builder()
+  .addStringField("field1")
+  .addInt64Field("field2")
+  .addStringField("field3")
+  .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByOneField() {
+PCollection>> grouped =
+pipeline
+.apply(
+Create.of(
+new POJO("key1", 1, "value1"),
+new POJO("key1", 2, "value2"),
+ 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=151693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151693
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Oct/18 16:59
Start Date: 05/Oct/18 16:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r223074665
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
 ##
 @@ -330,6 +330,24 @@ public Schema getSchema() {
 return ((SchemaCoder) getCoder()).getSchema();
   }
 
+  /** Returns the attached schema's toRowFunction. */
+  @Experimental(Kind.SCHEMAS)
+  public SerializableFunction getToRowFunction() {
 
 Review comment:
   I think this doesn't belong on `PCollection`, they're not a property of it. 
I'd rather have something like `T getCoder(Class)` overload to help with 
casting


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 151693)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-10-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=151124=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151124
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 04/Oct/18 10:53
Start Date: 04/Oct/18 10:53
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-426974021
 
 
   @akedin any more comments?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 151124)
Time Spent: 12h 10m  (was: 12h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=149440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149440
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 28/Sep/18 19:44
Start Date: 28/Sep/18 19:44
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-425545386
 
 
   @akedin comments addressed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149440)
Time Spent: 12h  (was: 11h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=149439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149439
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 28/Sep/18 19:43
Start Date: 28/Sep/18 19:43
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r221361758
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
+ *  .aggregateField("cost", Top.largestLongsFn(10), 
"top_purchases")
+ *  .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+ *Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+ * }
+ *
+ * The result will be a new row schema containing the fields total_cost, 
top_purchases, and
+ * transactionDurations, containing the sum of all purchases costs (for that 
user and country), the
+ * top ten purchases, and a histogram of transaction durations.
+ *
+ * Note that usually the field type can be automatically inferred from the 
{@link CombineFn}
+ * passed in. However sometimes it cannot be inferred, due to Java type 
erasure, in which case a
+ * {@link Field} object containing the field type must be passed in. This is 
currently the case for
+ * ApproximateQuantilesCombineFn in the above example.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Group {
+  /**
+   * Returns a transform that groups all elements in the input {@link 
PCollection}. The returned
+   * transform contains further builder methods to control how the grouping is 
done.
+   */
+  public static  Global 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=149437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149437
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 28/Sep/18 19:42
Start Date: 28/Sep/18 19:42
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r221361482
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
+ *  .aggregateField("cost", Top.largestLongsFn(10), 
"top_purchases")
+ *  .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+ *Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+ * }
+ *
+ * The result will be a new row schema containing the fields total_cost, 
top_purchases, and
+ * transactionDurations, containing the sum of all purchases costs (for that 
user and country), the
+ * top ten purchases, and a histogram of transaction durations.
+ *
+ * Note that usually the field type can be automatically inferred from the 
{@link CombineFn}
+ * passed in. However sometimes it cannot be inferred, due to Java type 
erasure, in which case a
+ * {@link Field} object containing the field type must be passed in. This is 
currently the case for
+ * ApproximateQuantilesCombineFn in the above example.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Group {
+  /**
+   * Returns a transform that groups all elements in the input {@link 
PCollection}. The returned
+   * transform contains further builder methods to control how the grouping is 
done.
+   */
+  public static  Global 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=149435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149435
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 28/Sep/18 19:41
Start Date: 28/Sep/18 19:41
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r221361309
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
+ *  .aggregateField("cost", Top.largestLongsFn(10), 
"top_purchases")
+ *  .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+ *Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+ * }
+ *
+ * The result will be a new row schema containing the fields total_cost, 
top_purchases, and
+ * transactionDurations, containing the sum of all purchases costs (for that 
user and country), the
+ * top ten purchases, and a histogram of transaction durations.
+ *
+ * Note that usually the field type can be automatically inferred from the 
{@link CombineFn}
+ * passed in. However sometimes it cannot be inferred, due to Java type 
erasure, in which case a
+ * {@link Field} object containing the field type must be passed in. This is 
currently the case for
+ * ApproximateQuantilesCombineFn in the above example.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Group {
+  /**
+   * Returns a transform that groups all elements in the input {@link 
PCollection}. The returned
+   * transform contains further builder methods to control how the grouping is 
done.
+   */
+  public static  Global 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=149434=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149434
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 28/Sep/18 19:40
Start Date: 28/Sep/18 19:40
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r221361037
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
 
 Review comment:
   This is closer to the Dataframes model, which I think is what we should be 
aiming at for a programmatic approach. It's also what fits in with Beam, where 
aggregations and groupbykey are intimately related. I don't think these 
transforms have to match SQL - we already have SQL for that scenario.
   
   The output schema is specified by the aggregateField calls (that's why you 
have to specify a field name in those calls). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149434)
Time Spent: 11h 20m  (was: 11h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=149334=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149334
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 28/Sep/18 19:36
Start Date: 28/Sep/18 19:36
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r221360212
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
 
 Review comment:
   Removed it, let's see if Javadoc passes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149334)
Time Spent: 11h 10m  (was: 11h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=147561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147561
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 25/Sep/18 13:37
Start Date: 25/Sep/18 13:37
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r220186019
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
 ##
 @@ -116,16 +117,53 @@ public Schema getSchema() {
 
   @Override
   public void verifyDeterministic()
-  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {}
+  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+verifyDeterministic(schema);
+  }
+
+  private void verifyDeterministic(Schema schema)
+  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+for (Field field : schema.getFields()) {
+  verifyDeterministic(field.getType());
+}
+  }
+
+  private void verifyDeterministic(FieldType fieldType)
+  throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+switch (fieldType.getTypeName()) {
+  case MAP:
+throw new NonDeterministicException(this, "Map-valued fields cannot be 
used in keys");
 
 Review comment:
   Why is this? Can you explain more in the message?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 147561)
Time Spent: 10h 40m  (was: 10.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=147565=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147565
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 25/Sep/18 13:37
Start Date: 25/Sep/18 13:37
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r220193345
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
+ *  .aggregateField("cost", Top.largestLongsFn(10), 
"top_purchases")
+ *  .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+ *Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+ * }
+ *
+ * The result will be a new row schema containing the fields total_cost, 
top_purchases, and
+ * transactionDurations, containing the sum of all purchases costs (for that 
user and country), the
+ * top ten purchases, and a histogram of transaction durations.
+ *
+ * Note that usually the field type can be automatically inferred from the 
{@link CombineFn}
+ * passed in. However sometimes it cannot be inferred, due to Java type 
erasure, in which case a
+ * {@link Field} object containing the field type must be passed in. This is 
currently the case for
+ * ApproximateQuantilesCombineFn in the above example.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Group {
+  /**
+   * Returns a transform that groups all elements in the input {@link 
PCollection}. The returned
+   * transform contains further builder methods to control how the grouping is 
done.
+   */
+  public static  Global 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=147563=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147563
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 25/Sep/18 13:37
Start Date: 25/Sep/18 13:37
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r220192805
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
+ *  .aggregateField("cost", Top.largestLongsFn(10), 
"top_purchases")
+ *  .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+ *Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+ * }
+ *
+ * The result will be a new row schema containing the fields total_cost, 
top_purchases, and
+ * transactionDurations, containing the sum of all purchases costs (for that 
user and country), the
+ * top ten purchases, and a histogram of transaction durations.
+ *
+ * Note that usually the field type can be automatically inferred from the 
{@link CombineFn}
+ * passed in. However sometimes it cannot be inferred, due to Java type 
erasure, in which case a
+ * {@link Field} object containing the field type must be passed in. This is 
currently the case for
+ * ApproximateQuantilesCombineFn in the above example.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Group {
+  /**
+   * Returns a transform that groups all elements in the input {@link 
PCollection}. The returned
+   * transform contains further builder methods to control how the grouping is 
done.
+   */
+  public static  Global 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=147564=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147564
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 25/Sep/18 13:37
Start Date: 25/Sep/18 13:37
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r220193157
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
+ *  .aggregateField("cost", Top.largestLongsFn(10), 
"top_purchases")
+ *  .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+ *Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+ * }
+ *
+ * The result will be a new row schema containing the fields total_cost, 
top_purchases, and
+ * transactionDurations, containing the sum of all purchases costs (for that 
user and country), the
+ * top ten purchases, and a histogram of transaction durations.
+ *
+ * Note that usually the field type can be automatically inferred from the 
{@link CombineFn}
+ * passed in. However sometimes it cannot be inferred, due to Java type 
erasure, in which case a
+ * {@link Field} object containing the field type must be passed in. This is 
currently the case for
+ * ApproximateQuantilesCombineFn in the above example.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Group {
+  /**
+   * Returns a transform that groups all elements in the input {@link 
PCollection}. The returned
+   * transform contains further builder methods to control how the grouping is 
done.
+   */
+  public static  Global 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=147562=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147562
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 25/Sep/18 13:37
Start Date: 25/Sep/18 13:37
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6298: 
[BEAM-4461] Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#discussion_r220191831
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 ##
 @@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A generic grouping transform for schema {@link PCollection}s.
+ *
+ * When used without a combiner, this transforms simply acts as a {@link 
GroupByKey} but without
+ * the need for the user to explicitly extract the keys. For example, consider 
the following input
+ * type:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * public class UserPurchase {
+ *   public String userId;
+ *   public String country;
+ *   public long cost;
+ *   public double transactionDuration;
+ * }
+ *
+ * {@literal PCollection} purchases = readUserPurchases();
+ * }
+ *
+ * You can group all purchases by user and country as follows:
+ *
+ * {@code
+ * {@literal @DefaultSchema(JavaFieldSchema.class)}
+ * {@literal PCollection>} byUser =
+ *   purchases.apply(Group.byFieldNames("userId', "country"));
+ * }
+ *
+ * However often an aggregation of some form is desired. The builder 
methods inside the Group
+ * class allows building up separate aggregations for every field (or set of 
fields) on the input
+ * schema, and generating an output schema based on these aggregations. For 
example:
+ *
+ * {@code
+ * {@literal PCollection>} aggregated = purchases
+ *  .apply(Group.byFieldNames("userId', "country")
+ *  .aggregateField("cost", Sum.ofLongs(), "total_cost")
 
 Review comment:
   I think the syntax is a bit confusing. It mixes up SQL-like `GROUP BY` with 
Beam's `GroupByKey`, so for me it takes a second to figure out what this 
translates to. E.g. if I read this coming from SQL, it's unclear why I am 
grouping by `userId` and `country` but then aggregating only `cost`, but not 
grouping by `cost`.
   
   It's also unclear what the output schema here is. Is it only the fields in 
the `Group.`? How does it work with `Select`?
   
   Would it be better to follow SQL-like approach? I.e. add aggregations in 
`Select` and use `Group` to only specify the keys?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 147562)
Time Spent: 10h 50m  (was: 10h 40m)

> Create a library of useful transforms that use schemas
> --
>
> 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=145187=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145187
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 18/Sep/18 03:13
Start Date: 18/Sep/18 03:13
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-422239865
 
 
   @akedin  @apilloud most of the files are now reverted, as they are submitted 
in other PRs. There's a little more I can split off into separate PRs, if you 
think necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 145187)
Time Spent: 10.5h  (was: 10h 20m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=145181=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145181
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 18/Sep/18 02:34
Start Date: 18/Sep/18 02:34
Worklog Time Spent: 10m 
  Work Description: reuvenlax closed pull request #6316: [BEAM-4461] Add 
Unnest transform.
URL: https://github.com/apache/beam/pull/6316
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 43d7a0f813d..86a0f4653d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -167,6 +167,10 @@ public Schema(List fields) {
 this.fields = fields;
 int index = 0;
 for (Field field : fields) {
+  if (fieldIndices.get(field.getName()) != null) {
+throw new IllegalArgumentException(
+"Duplicate field " + field.getName() + " added to schema");
+  }
   fieldIndices.put(field.getName(), index++);
 }
 this.hashCode = Objects.hash(fieldIndices, fields);
@@ -528,17 +532,18 @@ public int hashCode() {
 
 public abstract Builder toBuilder();
 
+/** Builder for {@link Field}. */
 @AutoValue.Builder
-abstract static class Builder {
-  abstract Builder setName(String name);
+public abstract static class Builder {
+  public abstract Builder setName(String name);
 
-  abstract Builder setDescription(String description);
+  public abstract Builder setDescription(String description);
 
-  abstract Builder setType(FieldType fieldType);
+  public abstract Builder setType(FieldType fieldType);
 
-  abstract Builder setNullable(Boolean nullable);
+  public abstract Builder setNullable(Boolean nullable);
 
-  abstract Field build();
+  public abstract Field build();
 }
 
 /** Return's a field with the give name and type. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
new file mode 100644
index 000..9c3381ef4e1
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=145042=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145042
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 17/Sep/18 19:49
Start Date: 17/Sep/18 19:49
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r218202066
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new 
AutoValue_Unnest_Inner.Builder().setFieldNameFunction(CONCAT_FIELD_NAMES).build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
 
 Review comment:
   replace `"."` with `"_"` in the comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 145042)
Time Spent: 10h 10m  (was: 10h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=145031=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145031
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 17/Sep/18 19:42
Start Date: 17/Sep/18 19:42
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6316: [BEAM-4461] Add 
Unnest transform.
URL: https://github.com/apache/beam/pull/6316#issuecomment-422144387
 
 
   @akedin this was caused by a bug in SQL (it wasn't using the 
Calcite-generated names, which are disambiguated). this is now fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 145031)
Time Spent: 10h  (was: 9h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=144963=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144963
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 17/Sep/18 17:15
Start Date: 17/Sep/18 17:15
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #6316: [BEAM-4461] Add Unnest 
transform.
URL: https://github.com/apache/beam/pull/6316#issuecomment-422097268
 
 
   I am not sure what is the correct thing to do here. I think that we should 
prevent users from explicitly adding duplicate fields (as far as I remember BQ 
SQL disallows duplicate fields, don't know about standard). But I don't know if 
this can be a valid thing to do internally. For example, when implementing a 
join there potentially can exist an intermediary joined schema that combines 
left and right side of the join. Ambiguities should be dealt with by projection 
which is applied after that. Does this sound like something that can happen?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144963)
Time Spent: 9h 50m  (was: 9h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=144596=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144596
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 15/Sep/18 16:41
Start Date: 15/Sep/18 16:41
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6316: [BEAM-4461] Add 
Unnest transform.
URL: https://github.com/apache/beam/pull/6316#issuecomment-421597073
 
 
   @akedin The problem seems to be that SQL tries to add the same field twice 
to a schema when joining. I can remove this check, but it seems to me like a 
bug in SQL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144596)
Time Spent: 9h 40m  (was: 9.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=144534=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144534
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 15/Sep/18 02:56
Start Date: 15/Sep/18 02:56
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #6316: [BEAM-4461] Add Unnest 
transform.
URL: https://github.com/apache/beam/pull/6316#issuecomment-421525759
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144534)
Time Spent: 9.5h  (was: 9h 20m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=144525=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144525
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 15/Sep/18 01:25
Start Date: 15/Sep/18 01:25
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6316: [BEAM-4461] Add 
Unnest transform.
URL: https://github.com/apache/beam/pull/6316#issuecomment-421520688
 
 
   @akedin All comments addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144525)
Time Spent: 9h 20m  (was: 9h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=144520=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144520
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 15/Sep/18 01:06
Start Date: 15/Sep/18 01:06
Worklog Time Spent: 10m 
  Work Description: reuvenlax closed pull request #6317: [BEAM-4461]  Add 
mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
new file mode 100644
index 000..252ea5e1831
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
+} else {
+  TypeName typeName = 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143641=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143641
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 17:11
Start Date: 12/Sep/18 17:11
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217115947
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
 
 Review comment:
   ok, maybe the default should be \ then?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143641)
Time Spent: 9h  (was: 8h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143640=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143640
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 17:10
Start Date: 12/Sep/18 17:10
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217114951
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
 
 Review comment:
   But "." seems to be the most common way of expressing nested fields (indeed 
even SQL uses it). So it seems to be the best default, even if it causes 
problems in some cases.
   
   This is another argument IMO for keeping the fieldNameFunction. Any name 
format I choose will break some use case, and I don't want 100 options to 
customize the format (e.g. customizing the separator character, etc.). This 
allows for the user to pick a different separator character trivially.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143640)
Time Spent: 8h 50m  (was: 8h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143639=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143639
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 17:10
Start Date: 12/Sep/18 17:10
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217115670
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
 
 Review comment:
   oh, no - I plan on always defaulting to fully-qualified names. If the user 
wants simpler names (e.g. to implement something like select x.y as foo), it's 
up to them to provide the function tp do that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143639)
Time Spent: 8h 40m  (was: 8.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143634=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143634
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 17:08
Start Date: 12/Sep/18 17:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217114951
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
 
 Review comment:
   But "." seems to be the most common way of expressing nested fields (indeed 
even SQL uses it). So it seems to be the best default, even if it causes 
problems in some cases.
   
   This is another argument IMO for keeping the fieldNameFunction. Any name 
format I choose will break some use case, and I don't want 100 options to 
customize the format (e.g. customizing the separator character, etc.). This 
allows for the user to pick a different separator character trivially.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143634)
Time Spent: 8h 20m  (was: 8h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143613=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143613
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 16:26
Start Date: 12/Sep/18 16:26
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217101981
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
+  l -> {
+return l.get(l.size() - 1);
+  };
+  /** Returns the result of unnesting the given schema. The default naming 
policy is used. */
+  static Schema getUnnestedSchema(Schema schema, int maxLevels) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES, 
maxLevels, 0);
+  }
+  /** Returns the result of unnesting the given schema with the given naming 
policy. */
+  static Schema getUnnestedSchema(
+  Schema schema, int maxLevels, SerializableFunction, String> 
fn) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, fn, maxLevels, 0);
+  }
+
+  private static Schema getUnnestedSchema(
+  Schema schema,
+  List nameComponents,
+  SerializableFunction, String> fn,
+  int maxLevel,
+  int currentLevel) {
+Schema.Builder builder = Schema.builder();
+for (Field field : schema.getFields()) {
+  nameComponents.add(field.getName());
+  if (field.getType().getTypeName().isCompositeType() && currentLevel < 
maxLevel) {
 
 Review comment:
   Hm, no reason :) Misunderstood the code, never mind.


This is an automated message from the 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143612=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143612
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 16:24
Start Date: 12/Sep/18 16:24
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217101237
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
 
 Review comment:
   I don't mind having this in the code, especially if you plan to use it, 
worst case we can always refactor it later.
   
   I assume (probably incorrectly) that you will use this code to provide 
simple names to the users in basic scenarios of field-access ParDos and allow 
fully qualified names if conflicts happen. In this case I am not sure if 
implicitly using simple names is the correct behavior. E.g. what will be the 
right thing to do if schema changes introducing the conflicts, or moving the 
field to another branch and changing its semantics, or introducing nested named 
schemas making dotted name resolution ambiguous? I am inclined to avoid 
resolving these kinds of problems for now (by always using fully qualified 
names until we better understand the use cases and path forward).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143612)
Time Spent: 8h  (was: 7h 50m)

> Create a library of useful 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143599=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143599
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 16:00
Start Date: 12/Sep/18 16:00
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r217092375
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
 
 Review comment:
   It is when SQL also knows how to access the inner fields. I.e. when there's 
a bunch of raw nested rows Calcite knows what it means to get a field value 
from a row with `"."`.
   
   But in this case after unnest you will have a flat single field with 
multiple dots. If such name gets to parser, it will be interpreted as nested 
field access, which will not work as there is no inner field after unnesting. 
So if the users want to query the output of the unnested `PCollection` using 
the dotted name of the flattened field it will probably fail for this reason 
(should we have a test for this?).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143599)
Time Spent: 7h 50m  (was: 7h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143366=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143366
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:31
Start Date: 12/Sep/18 04:31
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r216893940
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
+  l -> {
+return l.get(l.size() - 1);
+  };
+  /** Returns the result of unnesting the given schema. The default naming 
policy is used. */
+  static Schema getUnnestedSchema(Schema schema, int maxLevels) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES, 
maxLevels, 0);
+  }
+  /** Returns the result of unnesting the given schema with the given naming 
policy. */
+  static Schema getUnnestedSchema(
+  Schema schema, int maxLevels, SerializableFunction, String> 
fn) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, fn, maxLevels, 0);
+  }
+
+  private static Schema getUnnestedSchema(
+  Schema schema,
+  List nameComponents,
+  SerializableFunction, String> fn,
+  int maxLevel,
+  int currentLevel) {
+Schema.Builder builder = Schema.builder();
+for (Field field : schema.getFields()) {
+  nameComponents.add(field.getName());
+  if (field.getType().getTypeName().isCompositeType() && currentLevel < 
maxLevel) {
 
 Review comment:
   Why? 


This is an automated message from the Apache Git Service.
To respond to the 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143365=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143365
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:30
Start Date: 12/Sep/18 04:30
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r216893853
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
+  l -> {
+return l.get(l.size() - 1);
+  };
+  /** Returns the result of unnesting the given schema. The default naming 
policy is used. */
+  static Schema getUnnestedSchema(Schema schema, int maxLevels) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES, 
maxLevels, 0);
+  }
+  /** Returns the result of unnesting the given schema with the given naming 
policy. */
+  static Schema getUnnestedSchema(
+  Schema schema, int maxLevels, SerializableFunction, String> 
fn) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, fn, maxLevels, 0);
+  }
+
+  private static Schema getUnnestedSchema(
+  Schema schema,
+  List nameComponents,
+  SerializableFunction, String> fn,
+  int maxLevel,
+  int currentLevel) {
+Schema.Builder builder = Schema.builder();
+for (Field field : schema.getFields()) {
+  nameComponents.add(field.getName());
+  if (field.getType().getTypeName().isCompositeType() && currentLevel < 
maxLevel) {
+Schema nestedSchema =
+getUnnestedSchema(
+field.getType().getRowSchema(), nameComponents, fn, maxLevel, 
currentLevel + 1);
+for 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143364
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:30
Start Date: 12/Sep/18 04:30
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r216893831
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
 
 Review comment:
   Disagree here - I added this because I intend to use it fairly soon. Also I 
think a transform like this that munges names needs to be customizable - a 
hard-coded name strategy always runs into problems. Worth noting that this is 
basically the same as SQL (e.g. select X.Y AS NewName)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143364)
Time Spent: 7h 20m  (was: 7h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143363
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:25
Start Date: 12/Sep/18 04:25
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r216893261
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
 
 Review comment:
   But isn't "." separated fields exactly what SQL select syntax wants?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 143363)
Time Spent: 7h 10m  (was: 7h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143362=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143362
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:22
Start Date: 12/Sep/18 04:22
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r216892966
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
+} else {
+  TypeName typeName = PRIMITIVE_MAPPING.inverse().get(typeDescriptor);
+  if (typeName == null) {
+throw new RuntimeException("Couldn't find field type for " + 
typeDescriptor);
+  }
+  FieldType fieldType = FieldType.of(typeName);
+  return fieldType;
+}
+  }
+
+  private static FieldType getArrayFieldType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()) {
+  if (typeDescriptor.getComponentType().getType().equals(byte.class)) {
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143361=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143361
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:20
Start Date: 12/Sep/18 04:20
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r216892734
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
+} else {
+  TypeName typeName = PRIMITIVE_MAPPING.inverse().get(typeDescriptor);
+  if (typeName == null) {
+throw new RuntimeException("Couldn't find field type for " + 
typeDescriptor);
+  }
+  FieldType fieldType = FieldType.of(typeName);
+  return fieldType;
+}
+  }
+
+  private static FieldType getArrayFieldType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()) {
+  if (typeDescriptor.getComponentType().getType().equals(byte.class)) {
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143360=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143360
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:19
Start Date: 12/Sep/18 04:19
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r216892651
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
 
 Review comment:
   Of course it's possible - that's essentially what proto buffers do. However 
it can be quite tricky to get right (especially when you start thinking about 
compatibility between different versions), so I'm loathe to go down that path 
just yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143359
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 12/Sep/18 04:17
Start Date: 12/Sep/18 04:17
Worklog Time Spent: 10m 
  Work Description: reuvenlax closed pull request #6318: [BEAM-4461] Some 
fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index c4f7813fbdb..b7e68d2a896 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -32,6 +33,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -112,6 +114,16 @@ public static ComposeCombineFnBuilder compose() {
   return new ComposedCombineFn().with(extractInputFn, combineFn, 
outputTag);
 }
 
+/** Like {@link #with(SimpleFunction, CombineFn, TupleTag)} but with an 
explicit input coder. */
+public  ComposedCombineFn with(
+SimpleFunction extractInputFn,
+Coder combineInputCoder,
+CombineFn combineFn,
+TupleTag outputTag) {
+  return new ComposedCombineFn()
+  .with(extractInputFn, combineInputCoder, combineFn, outputTag);
+}
+
 /**
  * Returns a {@link ComposedCombineFnWithContext} that can take additional 
{@link
  * GlobalCombineFn GlobalCombineFns} and apply them as a single combine 
function.
@@ -127,6 +139,16 @@ public static ComposeCombineFnBuilder compose() {
   return new ComposedCombineFnWithContext()
   .with(extractInputFn, combineFnWithContext, outputTag);
 }
+
+/** Like {@link #with(SimpleFunction, CombineFnWithContext, TupleTag)} but 
with input coder. */
+public  ComposedCombineFnWithContext with(
+SimpleFunction extractInputFn,
+Coder combineInputCoder,
+CombineFnWithContext combineFnWithContext,
+TupleTag outputTag) {
+  return new ComposedCombineFnWithContext()
+  .with(extractInputFn, combineInputCoder, combineFnWithContext, 
outputTag);
+}
   }
 
   /
@@ -212,12 +234,14 @@ public int hashCode() {
   public static class ComposedCombineFn extends CombineFn {
 
 private final List> combineFns;
+private final List> combineInputCoders;
 private final List> extractInputFns;
 private final List> outputTags;
 private final int combineFnCount;
 
 private ComposedCombineFn() {
   this.extractInputFns = ImmutableList.of();
+  this.combineInputCoders = ImmutableList.of();
   this.combineFns = ImmutableList.of();
   this.outputTags = ImmutableList.of();
   this.combineFnCount = 0;
@@ -225,11 +249,13 @@ private ComposedCombineFn() {
 
 private ComposedCombineFn(
 ImmutableList> extractInputFns,
+List> combineInputCoders,
 ImmutableList> combineFns,
 ImmutableList> outputTags) {
   @SuppressWarnings({"unchecked", "rawtypes"})
   List> castedExtractInputFns = (List) 
extractInputFns;
   this.extractInputFns = castedExtractInputFns;
+  this.combineInputCoders = combineInputCoders;
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   List> castedCombineFns = (List) 
combineFns;
@@ -250,6 +276,10 @@ private ComposedCombineFn(
   .addAll(extractInputFns)
   .add(extractInputFn)
   .build(),
+  ImmutableList.>builder()
+  .addAll(combineInputCoders)
+  .add(Optional.absent())
+  .build(),
   ImmutableList.>builder().addAll(combineFns).add(combineFn).build(),
   
ImmutableList.>builder().addAll(outputTags).add(outputTag).build());
 }
@@ -272,6 +302,59 @@ private ComposedCombineFn(
   .addAll(extractInputFns)
   .add(extractInputFn)

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=142469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142469
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 08/Sep/18 17:16
Start Date: 08/Sep/18 17:16
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6318: [BEAM-4461] Some 
fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#issuecomment-419658860
 
 
   @akedin comments addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142469)
Time Spent: 6h 20m  (was: 6h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=142081=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142081
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 07/Sep/18 08:00
Start Date: 07/Sep/18 08:00
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6318: 
[BEAM-4461] Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#discussion_r215876628
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
 ##
 @@ -168,7 +168,40 @@ private Top() {
* {@code PCollection} of {@code KV}s and return the top values associated 
with each key.
*/
   public static > Combine.Globally> 
largest(int count) {
-return Combine.globally(new TopCombineFn<>(count, new Natural()));
+return Combine.globally(largestFn(count));
+  }
+
+  /** Returns a {@link TopCombineFn} that aggregates the largest count values. 
*/
+  public static > TopCombineFn> 
largestFn(int count) {
+return new TopCombineFn>(count, new Natural()) {};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the largest count long 
values. */
+  public static TopCombineFn> largestLongsFn(int count) {
+return new TopCombineFn>(count, new Natural()) 
{};
 
 Review comment:
   No, this isn't allowed for anonymous inner classes


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142081)
Time Spent: 6h 10m  (was: 6h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=142080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142080
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 07/Sep/18 07:59
Start Date: 07/Sep/18 07:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6318: 
[BEAM-4461] Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#discussion_r215876304
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
 ##
 @@ -470,7 +589,10 @@ public CoCombineResult extractOutput(Object[] 
accumulator, Context c) {
 throws CannotProvideCoderException {
   List> coders = Lists.newArrayList();
   for (int i = 0; i < combineFnCount; ++i) {
-Coder inputCoder = 
registry.getOutputCoder(extractInputFns.get(i), dataCoder);
+Coder inputCoder =
+combineInputCoders.get(i).isPresent()
 
 Review comment:
   Unfortunately the get.or method won't work here, since it's type signature 
can't handle functions that throw exceptions (such as 
CannotProvideCoderExceptions).
   
   As to the second, I don't think the registry is available at that time the 
combiner is built up. The registry is generally not available until later in 
pipeline construction, and the Combiner signature doesn't give it to us until 
getAccumulatorCoder is called.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142080)
Time Spent: 6h  (was: 5h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141488=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141488
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 18:35
Start Date: 05/Sep/18 18:35
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6318: 
[BEAM-4461] Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#discussion_r215360597
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
 ##
 @@ -112,6 +113,16 @@ public static ComposeCombineFnBuilder compose() {
   return new ComposedCombineFn().with(extractInputFn, combineFn, 
outputTag);
 }
 
+/** Like {@link #with(SimpleFunction, CombineFn, TupleTag)} bit with an 
explicit input coder. */
 
 Review comment:
   typo *but


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141488)
Time Spent: 5h 40m  (was: 5.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141489=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141489
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 18:35
Start Date: 05/Sep/18 18:35
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6318: 
[BEAM-4461] Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#discussion_r215381299
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
 ##
 @@ -168,7 +168,40 @@ private Top() {
* {@code PCollection} of {@code KV}s and return the top values associated 
with each key.
*/
   public static > Combine.Globally> 
largest(int count) {
-return Combine.globally(new TopCombineFn<>(count, new Natural()));
+return Combine.globally(largestFn(count));
+  }
+
+  /** Returns a {@link TopCombineFn} that aggregates the largest count values. 
*/
+  public static > TopCombineFn> 
largestFn(int count) {
+return new TopCombineFn>(count, new Natural()) {};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the largest count long 
values. */
+  public static TopCombineFn> largestLongsFn(int count) {
+return new TopCombineFn>(count, new Natural()) 
{};
 
 Review comment:
   can java infer more of `<>`? E.g. `new TopCombineFn<>(count, new 
Natural()) {};`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141489)
Time Spent: 5h 50m  (was: 5h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141490=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141490
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 18:35
Start Date: 05/Sep/18 18:35
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6318: 
[BEAM-4461] Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#discussion_r215361562
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
 ##
 @@ -470,7 +589,10 @@ public CoCombineResult extractOutput(Object[] 
accumulator, Context c) {
 throws CannotProvideCoderException {
   List> coders = Lists.newArrayList();
   for (int i = 0; i < combineFnCount; ++i) {
-Coder inputCoder = 
registry.getOutputCoder(extractInputFns.get(i), dataCoder);
+Coder inputCoder =
+combineInputCoders.get(i).isPresent()
 
 Review comment:
   I think something like 
`combineInputCoders.get(i).or(registry.getOutputCoder(extractInputFns.get(i), 
dataCoder))` should work.
   
   Also, I believe this logic should be changed, it would be clearer if the 
coders was not a list of optionals, but a list of coders, and the registry was 
consulted before this instead of adding `Optional.absent()`. The end goal is 
not to have optionals but use the values from registry instead when no coder is 
passed. Similar to how `CombineFnUtil.toFnWithContext(globalCombineFn)` is 
called.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141490)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141487
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 18:35
Start Date: 05/Sep/18 18:35
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6318: 
[BEAM-4461] Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#discussion_r215378279
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
 ##
 @@ -272,6 +301,60 @@ private ComposedCombineFn(
   .addAll(extractInputFns)
   .add(extractInputFn)
   .build(),
+  ImmutableList.>builder()
+  .addAll(combineInputCoders)
+  .add(Optional.absent())
+  .build(),
+  ImmutableList.>builder()
+  .addAll(fnsWithContext)
+  .add(combineFn)
+  .build(),
+  
ImmutableList.>builder().addAll(outputTags).add(outputTag).build());
+}
+
+/** Returns a {@link ComposedCombineFn} with an additional {@link 
CombineFn}. */
+public  ComposedCombineFn with(
+SimpleFunction extractInputFn,
+Coder combineInputCoder,
+CombineFn combineFn,
+TupleTag outputTag) {
+  checkUniqueness(outputTags, outputTag);
+  return new ComposedCombineFn<>(
+  ImmutableList.>builder()
+  .addAll(extractInputFns)
+  .add(extractInputFn)
+  .build(),
+  ImmutableList.>builder()
+  .addAll(combineInputCoders)
+  .add(Optional.of(combineInputCoder))
+  .build(),
+  ImmutableList.>builder().addAll(combineFns).add(combineFn).build(),
+  
ImmutableList.>builder().addAll(outputTags).add(outputTag).build());
+}
+
+/**
+ * Returns a {@link ComposedCombineFnWithContext} with an additional {@link
+ * CombineFnWithContext}.
+ */
+public  ComposedCombineFnWithContext with(
+SimpleFunction extractInputFn,
+Coder combineInputCoder,
+CombineFnWithContext combineFn,
+TupleTag outputTag) {
+  checkUniqueness(outputTags, outputTag);
+  List> fnsWithContext = 
Lists.newArrayList();
+  for (CombineFn fn : combineFns) {
+fnsWithContext.add(CombineFnUtil.toFnWithContext(fn));
 
 Review comment:
   nit: can this be rewritten as 
`combineFns.stream().map(CombilneFnUtil::toFnWithContext).collect(toList())`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141487)
Time Spent: 5h 40m  (was: 5.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141441
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 17:25
Start Date: 05/Sep/18 17:25
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r215347139
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
+  l -> {
+return l.get(l.size() - 1);
+  };
+  /** Returns the result of unnesting the given schema. The default naming 
policy is used. */
+  static Schema getUnnestedSchema(Schema schema, int maxLevels) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES, 
maxLevels, 0);
+  }
+  /** Returns the result of unnesting the given schema with the given naming 
policy. */
+  static Schema getUnnestedSchema(
+  Schema schema, int maxLevels, SerializableFunction, String> 
fn) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, fn, maxLevels, 0);
+  }
+
+  private static Schema getUnnestedSchema(
+  Schema schema,
+  List nameComponents,
+  SerializableFunction, String> fn,
+  int maxLevel,
+  int currentLevel) {
+Schema.Builder builder = Schema.builder();
+for (Field field : schema.getFields()) {
+  nameComponents.add(field.getName());
+  if (field.getType().getTypeName().isCompositeType() && currentLevel < 
maxLevel) {
 
 Review comment:
   throw for arrays/maps?


This is an automated message from the Apache Git Service.
To 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141442
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 17:25
Start Date: 05/Sep/18 17:25
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r215354112
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
 
 Review comment:
   I would probably use some symbol other than `"."` (maybe use `"_"` instead) 
to avoid accidental conflicts with field access operator in weird places 
downstream (e.g. accessing fields in SQL, or parsing json with these dotted 
field names).  E.g. you cannot do `JSON.parse("{ \"asd.asd\" : \"123123\" 
}").asd.asd`, which probably a some people would expect, and `eval("{ asd.asd : 
\"123123\" }")` doesn't work.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141442)
Time Spent: 5h 20m  (was: 5h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141443
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 17:25
Start Date: 05/Sep/18 17:25
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r215352115
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
+  l -> {
+return l.get(l.size() - 1);
+  };
+  /** Returns the result of unnesting the given schema. The default naming 
policy is used. */
+  static Schema getUnnestedSchema(Schema schema, int maxLevels) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES, 
maxLevels, 0);
+  }
+  /** Returns the result of unnesting the given schema with the given naming 
policy. */
+  static Schema getUnnestedSchema(
+  Schema schema, int maxLevels, SerializableFunction, String> 
fn) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, fn, maxLevels, 0);
+  }
+
+  private static Schema getUnnestedSchema(
+  Schema schema,
+  List nameComponents,
+  SerializableFunction, String> fn,
+  int maxLevel,
+  int currentLevel) {
+Schema.Builder builder = Schema.builder();
+for (Field field : schema.getFields()) {
+  nameComponents.add(field.getName());
+  if (field.getType().getTypeName().isCompositeType() && currentLevel < 
maxLevel) {
+Schema nestedSchema =
+getUnnestedSchema(
+field.getType().getRowSchema(), nameComponents, fn, maxLevel, 
currentLevel + 1);
+for 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141439
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 17:25
Start Date: 05/Sep/18 17:25
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r215348371
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
 
 Review comment:
   Do we really need to customize naming in the first iteration? I suggest 
removing `CONCAT_FIELD_NAMES` and `KEEP_NESTED_NAME`, they have a 
[YAGNI](https://martinfowler.com/bliki/Yagni.html) smell. My opinion is that 
documentation of how naming behaves should be enough to start with, and then we 
could add policies later if needed. Or we could make the two behaviors a flag 
for now, e.g. `concatFieldNames()` vs `useLeafFieldName()`, I think it would be 
clearer to the users


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141439)
Time Spent: 5h  (was: 4h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
> 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141440
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 17:25
Start Date: 05/Sep/18 17:25
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6316: 
[BEAM-4461] Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316#discussion_r215349643
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * For example, consider a Row with the following nestedschema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * After unnesting, all of the rows will be converted to rows satisfying 
the following schema:
+ *
+ * UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude: 
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * By default nested names are concatenated to generated the unnested name, 
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming 
policy.
+ *
+ * Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+  public static  Inner create() {
+return new AutoValue_Unnest_Inner.Builder()
+.setMaxLevels(Integer.MAX_VALUE)
+.setFieldNameFunction(CONCAT_FIELD_NAMES)
+.build();
+  }
+  /**
+   * This is the default naming policy for naming fields. Every field name in 
the path to a given
+   * field is concated with . characters.
+   */
+  public static final SerializableFunction, String> 
CONCAT_FIELD_NAMES =
+  l -> {
+return String.join(".", l);
+  };
+  /**
+   * This policy keeps the raw nested field name. If two differently-nested 
fields have the same
+   * name, unnesting will fail with this policy.
+   */
+  public static final SerializableFunction, String> 
KEEP_NESTED_NAME =
+  l -> {
+return l.get(l.size() - 1);
+  };
+  /** Returns the result of unnesting the given schema. The default naming 
policy is used. */
+  static Schema getUnnestedSchema(Schema schema, int maxLevels) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES, 
maxLevels, 0);
+  }
+  /** Returns the result of unnesting the given schema with the given naming 
policy. */
+  static Schema getUnnestedSchema(
+  Schema schema, int maxLevels, SerializableFunction, String> 
fn) {
+List nameComponents = Lists.newArrayList();
+return getUnnestedSchema(schema, nameComponents, fn, maxLevels, 0);
+  }
+
+  private static Schema getUnnestedSchema(
+  Schema schema,
+  List nameComponents,
+  SerializableFunction, String> fn,
+  int maxLevel,
+  int currentLevel) {
+Schema.Builder builder = Schema.builder();
+for (Field field : schema.getFields()) {
+  nameComponents.add(field.getName());
+  if (field.getType().getTypeName().isCompositeType() && currentLevel < 
maxLevel) {
+Schema nestedSchema =
+getUnnestedSchema(
+field.getType().getRowSchema(), nameComponents, fn, maxLevel, 
currentLevel + 1);
+for 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141407=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141407
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 16:38
Start Date: 05/Sep/18 16:38
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r215340974
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
+} else {
+  TypeName typeName = PRIMITIVE_MAPPING.inverse().get(typeDescriptor);
+  if (typeName == null) {
+throw new RuntimeException("Couldn't find field type for " + 
typeDescriptor);
+  }
+  FieldType fieldType = FieldType.of(typeName);
+  return fieldType;
+}
+  }
+
+  private static FieldType getArrayFieldType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()) {
+  if (typeDescriptor.getComponentType().getType().equals(byte.class)) {
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141408=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141408
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 16:38
Start Date: 05/Sep/18 16:38
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r215342039
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
+} else {
+  TypeName typeName = PRIMITIVE_MAPPING.inverse().get(typeDescriptor);
+  if (typeName == null) {
+throw new RuntimeException("Couldn't find field type for " + 
typeDescriptor);
+  }
+  FieldType fieldType = FieldType.of(typeName);
+  return fieldType;
+}
+  }
+
+  private static FieldType getArrayFieldType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()) {
+  if (typeDescriptor.getComponentType().getType().equals(byte.class)) {
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141409
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 16:38
Start Date: 05/Sep/18 16:38
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r215338969
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+/**
+ * Utilities for converting between {@link Schema} field types and {@link 
TypeDescriptor}s that
+ * define Java objects which can represent these field types.
+ */
+public class FieldTypeDescriptors {
+  private static final BiMap PRIMITIVE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(TypeName.BYTE, TypeDescriptors.bytes())
+  .put(TypeName.INT16, TypeDescriptors.shorts())
+  .put(TypeName.INT32, TypeDescriptors.integers())
+  .put(TypeName.INT64, TypeDescriptors.longs())
+  .put(TypeName.DECIMAL, TypeDescriptors.bigdecimals())
+  .put(TypeName.FLOAT, TypeDescriptors.floats())
+  .put(TypeName.DOUBLE, TypeDescriptors.doubles())
+  .put(TypeName.STRING, TypeDescriptors.strings())
+  .put(TypeName.DATETIME, TypeDescriptor.of(Instant.class))
+  .put(TypeName.BOOLEAN, TypeDescriptors.booleans())
+  .put(TypeName.BYTES, TypeDescriptor.of(byte[].class))
+  .build();
+  /** Get a {@link TypeDescriptor} from a {@link FieldType}. */
+  public static TypeDescriptor javaTypeForFieldType(FieldType fieldType) {
+switch (fieldType.getTypeName()) {
+  case ARRAY:
+return 
TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));
+  case MAP:
+return TypeDescriptors.maps(
+javaTypeForFieldType(fieldType.getMapKeyType()),
+javaTypeForFieldType(fieldType.getMapValueType()));
+  case ROW:
+return TypeDescriptors.rows();
+  default:
+return PRIMITIVE_MAPPING.get(fieldType.getTypeName());
+}
+  }
+  /** Get a {@link FieldType} from a {@link TypeDescriptor}. */
+  public static FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {
+if (typeDescriptor.isArray()
+|| typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+  return getArrayFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+  return getMapFieldType(typeDescriptor);
+} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {
+  throw new IllegalArgumentException(
+  "Cannot automatically determine a field type from a Row class"
+  + " as we cannot determine the schema. You should set a field 
type explicitly.");
 
 Review comment:
   Just an idea, would it be possible (and practical) to generate row 
subclasses for all schemas, so that you can access the schema in places like 
this one? E.g.
   
   
   ```
   class GeneratedRowSubclassWithSchema$1 extends Row {
  public static final Schema ROW_SCHEMA = ...;
   }
   
   if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class)) {
  // do reflection magic to extract ROW_SCHEMA static field
   }
   ```


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141410=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141410
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 16:38
Start Date: 05/Sep/18 16:38
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #6317: 
[BEAM-4461]  Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317#discussion_r215343386
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+/** test for {@link FieldTypeDescriptors}. */
+public class FieldTypeDescriptorsTest {
+  @Test
+  public void testPrimitiveTypeToJavaType() {
+assertEquals(
+TypeDescriptors.bytes(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.BYTE));
+assertEquals(
+TypeDescriptors.shorts(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.INT16));
+assertEquals(
+TypeDescriptors.integers(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.INT32));
+assertEquals(
+TypeDescriptors.longs(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.INT64));
+assertEquals(
+TypeDescriptors.bigdecimals(),
+FieldTypeDescriptors.javaTypeForFieldType(FieldType.DECIMAL));
+assertEquals(
+TypeDescriptors.floats(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.FLOAT));
+assertEquals(
+TypeDescriptors.doubles(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.DOUBLE));
+assertEquals(
+TypeDescriptors.strings(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.STRING));
+assertEquals(
+TypeDescriptor.of(Instant.class),
+FieldTypeDescriptors.javaTypeForFieldType(FieldType.DATETIME));
+assertEquals(
+TypeDescriptors.booleans(), 
FieldTypeDescriptors.javaTypeForFieldType(FieldType.BOOLEAN));
+assertEquals(
+TypeDescriptor.of(byte[].class),
+FieldTypeDescriptors.javaTypeForFieldType(FieldType.BYTES));
+  }
+
+  @Test
+  public void testRowTypeToJavaType() {
+assertEquals(
+TypeDescriptors.lists(TypeDescriptors.rows()),
+FieldTypeDescriptors.javaTypeForFieldType(
+FieldType.array(FieldType.row(Schema.builder().build();
+  }
+
+  @Test
+  public void testArrayTypeToJavaType() {
+assertEquals(
+TypeDescriptors.lists(TypeDescriptors.longs()),
+
FieldTypeDescriptors.javaTypeForFieldType(FieldType.array(FieldType.INT64)));
+assertEquals(
+TypeDescriptors.lists(TypeDescriptors.lists(TypeDescriptors.longs())),
+FieldTypeDescriptors.javaTypeForFieldType(
+FieldType.array(FieldType.array(FieldType.INT64;
+  }
+
+  @Test
+  public void testMapTypeToJavaType() {
+assertEquals(
+TypeDescriptors.maps(TypeDescriptors.strings(), 
TypeDescriptors.longs()),
+FieldTypeDescriptors.javaTypeForFieldType(
+FieldType.map(FieldType.STRING, FieldType.INT64)));
+assertEquals(
+TypeDescriptors.maps(
+TypeDescriptors.strings(), 
TypeDescriptors.lists(TypeDescriptors.longs())),
+FieldTypeDescriptors.javaTypeForFieldType(
+FieldType.map(FieldType.STRING, 
FieldType.array(FieldType.INT64;
+  }
+
+  @Test
+  public void testPrimitiveTypeToFieldType() {
+assertEquals(
+FieldType.BYTE, 
FieldTypeDescriptors.fieldTypeForJavaType(TypeDescriptors.bytes()));
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=141379=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141379
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 05/Sep/18 16:13
Start Date: 05/Sep/18 16:13
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #6298: [BEAM-4461] Introduce 
Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-418789669
 
 
   Thank you, looking at those


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141379)
Time Spent: 4h 20m  (was: 4h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=140366=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140366
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 01/Sep/18 23:58
Start Date: 01/Sep/18 23:58
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-417895432
 
 
   Several parts are now split into separate PRs.
   
   On Thu, Aug 30, 2018 at 11:04 PM Anton Kedin 
   wrote:
   
   > Can you split this PR? E.g. infrastructure work into one,
   > SchemaAggregateFn into another, and the Group into a separate one?
   > Otherwise it is hard to review all at once
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140366)
Time Spent: 4h 10m  (was: 4h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-09-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=140364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140364
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 01/Sep/18 23:28
Start Date: 01/Sep/18 23:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6318: [BEAM-4461] Some 
fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318#issuecomment-417894245
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140364)
Time Spent: 4h  (was: 3h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=140132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140132
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 31/Aug/18 16:00
Start Date: 31/Aug/18 16:00
Worklog Time Spent: 10m 
  Work Description: reuvenlax opened a new pull request #6318: [BEAM-4461] 
Some fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140132)
Time Spent: 3h 50m  (was: 3h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=140121=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140121
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 31/Aug/18 14:59
Start Date: 31/Aug/18 14:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax opened a new pull request #6317: [BEAM-4461]  
Add mapping between FieldType and Java types.
URL: https://github.com/apache/beam/pull/6317
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140121)
Time Spent: 3h 40m  (was: 3.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=140120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140120
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 31/Aug/18 14:55
Start Date: 31/Aug/18 14:55
Worklog Time Spent: 10m 
  Work Description: reuvenlax opened a new pull request #6316: [BEAM-4461] 
Add Unnest transform.
URL: https://github.com/apache/beam/pull/6316
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140120)
Time Spent: 3.5h  (was: 3h 20m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=139955=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139955
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 31/Aug/18 06:04
Start Date: 31/Aug/18 06:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #6298: [BEAM-4461] Introduce 
Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-417561294
 
 
   Can you split this PR? E.g. infrastructure work into one, SchemaAggregateFn 
into another, and the Group into a separate one? Otherwise it is hard to review 
all at once


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 139955)
Time Spent: 3h 20m  (was: 3h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=139586=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139586
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 30/Aug/18 07:55
Start Date: 30/Aug/18 07:55
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-417225567
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 139586)
Time Spent: 3h 10m  (was: 3h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=139469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139469
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 30/Aug/18 00:49
Start Date: 30/Aug/18 00:49
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-417153801
 
 
   run Java Precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 139469)
Time Spent: 2h 50m  (was: 2h 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=139470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139470
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 30/Aug/18 00:49
Start Date: 30/Aug/18 00:49
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298#issuecomment-417153865
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 139470)
Time Spent: 3h  (was: 2h 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=139389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139389
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 29/Aug/18 19:42
Start Date: 29/Aug/18 19:42
Worklog Time Spent: 10m 
  Work Description: reuvenlax opened a new pull request #6298: [BEAM-4461] 
Introduce Group transform.
URL: https://github.com/apache/beam/pull/6298
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 139389)
Time Spent: 2h 40m  (was: 2.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131078=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131078
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 21:26
Start Date: 03/Aug/18 21:26
Worklog Time Spent: 10m 
  Work Description: reuvenlax closed pull request #6133: [BEAM-4461] Add 
Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index 637091105e7..ed33673a91c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -106,7 +106,7 @@ public static FieldAccessDescriptor 
withFieldNames(String... names) {
* in a recursive {@link FieldAccessDescriptor}.
*/
   public static FieldAccessDescriptor withFieldNames(Iterable 
fieldNames) {
-return 
builder().setFieldNamesAccessed(Sets.newHashSet(fieldNames)).build();
+return 
builder().setFieldNamesAccessed(Sets.newTreeSet(fieldNames)).build();
   }
 
   /**
@@ -128,7 +128,12 @@ public static FieldAccessDescriptor 
withFieldIds(Integer... ids) {
* in a recursive {@link FieldAccessDescriptor}.
*/
   public static FieldAccessDescriptor withFieldIds(Iterable ids) {
-return builder().setFieldIdsAccessed(Sets.newHashSet(ids)).build();
+return builder().setFieldIdsAccessed(Sets.newTreeSet(ids)).build();
+  }
+
+  /** Return an empty {@link FieldAccessDescriptor}. */
+  public static FieldAccessDescriptor create() {
+return builder().build();
   }
 
   /**
@@ -194,7 +199,7 @@ public FieldAccessDescriptor resolve(Schema schema) {
   }
 
   private Set resolveFieldIdsAccessed(Schema schema) {
-Set fieldIds = 
Sets.newHashSetWithExpectedSize(getFieldIdsAccessed().size());
+Set fieldIds = Sets.newTreeSet();
 for (int fieldId : getFieldIdsAccessed()) {
   fieldIds.add(validateFieldId(schema, fieldId));
 }
@@ -229,7 +234,7 @@ private FieldAccessDescriptor resolvedNestedFieldsHelper(
   }
 
   private Map 
resolveNestedFieldsAccessed(Schema schema) {
-Map nestedFields = Maps.newHashMap();
+Map nestedFields = Maps.newTreeMap();
 
 nestedFields.putAll(
 getNestedFieldsAccessedByName()
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
new file mode 100644
index 000..4f3da61d700
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the 

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131070=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131070
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 20:57
Start Date: 03/Aug/18 20:57
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6133: [BEAM-4461] Add 
Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#issuecomment-410374415
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 131070)
Time Spent: 2h 20m  (was: 2h 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131055=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131055
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 19:56
Start Date: 03/Aug/18 19:56
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6133: [BEAM-4461] Add 
Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#issuecomment-410360139
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 131055)
Time Spent: 2h 10m  (was: 2h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131034=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131034
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 19:33
Start Date: 03/Aug/18 19:33
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207646193
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
+  public static class Inner extends PTransform, 
PCollection> {
+Map> fieldNameFilters = 
Maps.newHashMap();
+Map> fieldIdFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldNamesFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldIdsFilters = 
Maps.newHashMap();
+
+/** Set a predicate based on the value of a field, where the field is 
specified by name. */
+public Inner whereFieldName(String fieldName, SerializableFunction predicate) {
+  fieldNameFilters.put(fieldName, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of a field, where the field is 
specified by id. */
+public Inner whereFieldId(int fieldId, SerializableFunction 
predicate) {
+  fieldIdFilters.put(fieldId, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
name. */
+public Inner whereFieldNames(
+List fieldNames, SerializableFunction predicate) 
{
+  fieldNamesFilters.put(fieldNames, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
id. */
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131035=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131035
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 19:33
Start Date: 03/Aug/18 19:33
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207646200
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
+  public static class Inner extends PTransform, 
PCollection> {
+Map> fieldNameFilters = 
Maps.newHashMap();
+Map> fieldIdFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldNamesFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldIdsFilters = 
Maps.newHashMap();
+
+/** Set a predicate based on the value of a field, where the field is 
specified by name. */
+public Inner whereFieldName(String fieldName, SerializableFunction predicate) {
+  fieldNameFilters.put(fieldName, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of a field, where the field is 
specified by id. */
+public Inner whereFieldId(int fieldId, SerializableFunction 
predicate) {
+  fieldIdFilters.put(fieldId, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
name. */
+public Inner whereFieldNames(
+List fieldNames, SerializableFunction predicate) 
{
+  fieldNamesFilters.put(fieldNames, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
id. */
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131032=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131032
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 19:33
Start Date: 03/Aug/18 19:33
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207646178
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 131032)
Time Spent: 1.5h  (was: 1h 20m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131033=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131033
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 19:33
Start Date: 03/Aug/18 19:33
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207646185
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
+  public static class Inner extends PTransform, 
PCollection> {
+Map> fieldNameFilters = 
Maps.newHashMap();
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 131033)
Time Spent: 1h 40m  (was: 1.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130971=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130971
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 17:34
Start Date: 03/Aug/18 17:34
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207612650
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
+  public static class Inner extends PTransform, 
PCollection> {
+Map> fieldNameFilters = 
Maps.newHashMap();
+Map> fieldIdFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldNamesFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldIdsFilters = 
Maps.newHashMap();
+
+/** Set a predicate based on the value of a field, where the field is 
specified by name. */
+public Inner whereFieldName(String fieldName, SerializableFunction predicate) {
+  fieldNameFilters.put(fieldName, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of a field, where the field is 
specified by id. */
+public Inner whereFieldId(int fieldId, SerializableFunction 
predicate) {
+  fieldIdFilters.put(fieldId, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
name. */
+public Inner whereFieldNames(
+List fieldNames, SerializableFunction predicate) 
{
+  fieldNamesFilters.put(fieldNames, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
id. */
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130968=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130968
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 17:34
Start Date: 03/Aug/18 17:34
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207609080
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
 
 Review comment:
   Extra `*` at the end of this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130968)
Time Spent: 1h  (was: 50m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130970=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130970
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 17:34
Start Date: 03/Aug/18 17:34
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207611822
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
+  public static class Inner extends PTransform, 
PCollection> {
+Map> fieldNameFilters = 
Maps.newHashMap();
+Map> fieldIdFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldNamesFilters = 
Maps.newHashMap();
+Map, SerializableFunction> fieldIdsFilters = 
Maps.newHashMap();
+
+/** Set a predicate based on the value of a field, where the field is 
specified by name. */
+public Inner whereFieldName(String fieldName, SerializableFunction predicate) {
+  fieldNameFilters.put(fieldName, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of a field, where the field is 
specified by id. */
+public Inner whereFieldId(int fieldId, SerializableFunction 
predicate) {
+  fieldIdFilters.put(fieldId, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
name. */
+public Inner whereFieldNames(
+List fieldNames, SerializableFunction predicate) 
{
+  fieldNamesFilters.put(fieldNames, predicate);
+  return this;
+}
+
+/** Set a predicate based on the value of multipled fields, specified by 
id. */
+

[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130969=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130969
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 17:34
Start Date: 03/Aug/18 17:34
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207609370
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static  Inner create() {
+return new Inner();
+  }
+
+  /** Implementation of the filter. * */
+  public static class Inner extends PTransform, 
PCollection> {
+Map> fieldNameFilters = 
Maps.newHashMap();
 
 Review comment:
   These should probably all be `private final`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130969)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130920
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 15:56
Start Date: 03/Aug/18 15:56
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6133: [BEAM-4461] Add 
Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#issuecomment-410297508
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130920)
Time Spent: 50m  (was: 40m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130888=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130888
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 15:01
Start Date: 03/Aug/18 15:01
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207573511
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+public class Filter {
 
 Review comment:
   Good catch! fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130888)
Time Spent: 40m  (was: 0.5h)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130765
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 08:35
Start Date: 03/Aug/18 08:35
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6133: 
[BEAM-4461] Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#discussion_r207477419
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 ##
 @@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * For example, consider the following schema type:
+ *
+ * {@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * {@code
+ * PCollection locations = readLocations();
+ * locations.apply(Filter
+ *.whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *.whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * {@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *   ...
+ * }
+ * }
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * {@code
+ * PCollection users = readUsers();
+ * users.apply(Filter
+ *.whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }
+ */
+public class Filter {
 
 Review comment:
   @Experimental too, no ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130765)
Time Spent: 0.5h  (was: 20m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130762
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 07:58
Start Date: 03/Aug/18 07:58
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #6133: [BEAM-4461] Add 
Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133#issuecomment-410176657
 
 
   R:@apilloud 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130762)
Time Spent: 20m  (was: 10m)

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas

2018-08-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=130761=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130761
 ]

ASF GitHub Bot logged work on BEAM-4461:


Author: ASF GitHub Bot
Created on: 03/Aug/18 07:57
Start Date: 03/Aug/18 07:57
Worklog Time Spent: 10m 
  Work Description: reuvenlax opened a new pull request #6133: [BEAM-4461] 
Add Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 130761)
Time Spent: 10m
Remaining Estimate: 0h

> Create a library of useful transforms that use schemas
> --
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)