Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 2a1377e1c -> f37a7a19c


update JavaDoc for BeamRecord, BeamRecordType.
Also only create new UDF class instances for SerializableFunction UDFs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd27036a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd27036a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd27036a

Branch: refs/heads/DSL_SQL
Commit: cd27036a1c537a9059f13bbcdaec0264481ebd0b
Parents: 2a1377e
Author: mingmxu <ming...@ebay.com>
Authored: Wed Aug 9 13:30:16 2017 -0700
Committer: Tyler Akidau <taki...@apache.org>
Committed: Thu Aug 10 13:58:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/values/BeamRecord.java  | 118 ++++++++++++++++++-
 .../apache/beam/sdk/values/BeamRecordType.java  |  60 ++++++----
 .../operator/BeamSqlUdfExpression.java          |   5 +-
 3 files changed, 159 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index fa3b574..fd26f46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -25,11 +25,17 @@ import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
 
 /**
- * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
- * {@link BeamRecordType}, represents one element in a
- * {@link org.apache.beam.sdk.values.PCollection}.
+ * {@link BeamRecord} is an immutable tuple-like type to represent one element 
in a
+ * {@link PCollection}. The fields are described with a {@link BeamRecordType}.
+ *
+ * <p>By default, {@link BeamRecordType} only contains the name for each 
field. It
+ * can be extended to support more sophisticated validation by overwriting
+ * {@link BeamRecordType#validateValueType(int, Object)}.
+ *
+ * <p>A Coder {@link BeamRecordCoder} is provided, which wraps the Coder for 
each data field.
  */
 @Experimental
 public class BeamRecord implements Serializable {
@@ -63,6 +69,9 @@ public class BeamRecord implements Serializable {
     }
   }
 
+  /**
+   * see {@link #BeamRecord(BeamRecordType, List)}.
+   */
   public BeamRecord(BeamRecordType dataType, Object... rawdataValues) {
     this(dataType, Arrays.asList(rawdataValues));
   }
@@ -72,110 +81,213 @@ public class BeamRecord implements Serializable {
     dataValues.set(index, fieldValue);
   }
 
+  /**
+   * Get value by field name.
+   */
   public Object getFieldValue(String fieldName) {
     return getFieldValue(dataType.getFieldNames().indexOf(fieldName));
   }
 
+  /**
+   * Get a {@link Byte} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Byte getByte(String fieldName) {
     return (Byte) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Short} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Short getShort(String fieldName) {
     return (Short) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Integer} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Integer getInteger(String fieldName) {
     return (Integer) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Float} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Float getFloat(String fieldName) {
     return (Float) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Double} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Double getDouble(String fieldName) {
     return (Double) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Long} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Long getLong(String fieldName) {
     return (Long) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link String} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public String getString(String fieldName) {
     return (String) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Date} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Date getDate(String fieldName) {
     return (Date) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link GregorianCalendar} value by field name, {@link 
ClassCastException} is thrown
+   * if type doesn't match.
+   */
   public GregorianCalendar getGregorianCalendar(String fieldName) {
     return (GregorianCalendar) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link BigDecimal} value by field name, {@link ClassCastException} 
is thrown
+   * if type doesn't match.
+   */
   public BigDecimal getBigDecimal(String fieldName) {
     return (BigDecimal) getFieldValue(fieldName);
   }
 
+  /**
+   * Get a {@link Boolean} value by field name, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Boolean getBoolean(String fieldName) {
     return (Boolean) getFieldValue(fieldName);
   }
 
+  /**
+   * Get value by field index.
+   */
   public Object getFieldValue(int fieldIdx) {
     return dataValues.get(fieldIdx);
   }
 
+  /**
+   * Get a {@link Byte} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Byte getByte(int idx) {
     return (Byte) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Short} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Short getShort(int idx) {
     return (Short) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Integer} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Integer getInteger(int idx) {
     return (Integer) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Float} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Float getFloat(int idx) {
     return (Float) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Double} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Double getDouble(int idx) {
     return (Double) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Long} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Long getLong(int idx) {
     return (Long) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link String} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public String getString(int idx) {
     return (String) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Date} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Date getDate(int idx) {
     return (Date) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link GregorianCalendar} value by field index, {@link 
ClassCastException} is thrown
+   * if type doesn't match.
+   */
   public GregorianCalendar getGregorianCalendar(int idx) {
     return (GregorianCalendar) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link BigDecimal} value by field index, {@link ClassCastException} 
is thrown
+   * if type doesn't match.
+   */
   public BigDecimal getBigDecimal(int idx) {
     return (BigDecimal) getFieldValue(idx);
   }
 
+  /**
+   * Get a {@link Boolean} value by field index, {@link ClassCastException} is 
thrown
+   * if type doesn't match.
+   */
   public Boolean getBoolean(int idx) {
     return (Boolean) getFieldValue(idx);
   }
 
+  /**
+   * Return the size of data fields.
+   */
   public int getFieldCount() {
     return dataValues.size();
   }
 
+  /**
+   * Return the list of data values.
+   */
   public List<Object> getDataValues() {
     return dataValues;
   }
 
+  /**
+   * Return {@link BeamRecordType} which describes the fields.
+   */
   public BeamRecordType getDataType() {
     return dataType;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
index 29cc80d..620361c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -25,14 +25,22 @@ import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.coders.Coder;
 
 /**
- * The default type provider used in {@link BeamRecord}.
+ * {@link BeamRecordType} describes the fields in {@link BeamRecord}, extra 
checking can be added
+ * by overwriting {@link BeamRecordType#validateValueType(int, Object)}.
  */
 @Experimental
 public class BeamRecordType implements Serializable{
   private List<String> fieldNames;
   private List<Coder> fieldCoders;
 
+  /**
+   * Create a {@link BeamRecordType} with a name and Coder for each field.
+   */
   public BeamRecordType(List<String> fieldNames, List<Coder> fieldCoders) {
+    if (fieldNames.size() != fieldCoders.size()) {
+      throw new IllegalStateException(
+          "the size of fieldNames and fieldCoders need to be the same.");
+    }
     this.fieldNames = fieldNames;
     this.fieldCoders = fieldCoders;
   }
@@ -41,30 +49,42 @@ public class BeamRecordType implements Serializable{
    * Validate input fieldValue for a field.
    * @throws IllegalArgumentException throw exception when the validation 
fails.
    */
-   public void validateValueType(int index, Object fieldValue)
-      throws IllegalArgumentException{
-     //do nothing by default.
-   }
+  public void validateValueType(int index, Object fieldValue)
+     throws IllegalArgumentException{
+    //do nothing by default.
+  }
 
-   /**
-    * Get the coder for {@link BeamRecordCoder}.
-    */
-   public BeamRecordCoder getRecordCoder(){
-     return BeamRecordCoder.of(this, fieldCoders);
-   }
+  /**
+   * Return the coder for {@link BeamRecord}, which wraps {@link #fieldCoders} 
for each field.
+   */
+  public BeamRecordCoder getRecordCoder(){
+    return BeamRecordCoder.of(this, fieldCoders);
+  }
 
-   public List<String> getFieldNames(){
-     return ImmutableList.copyOf(fieldNames);
-   }
+  /**
+   * Returns an immutable list of field names.
+   */
+  public List<String> getFieldNames(){
+    return ImmutableList.copyOf(fieldNames);
+  }
 
-   public String getFieldNameByIndex(int index){
-     return fieldNames.get(index);
-   }
+  /**
+   * Return the name of field by index.
+   */
+  public String getFieldNameByIndex(int index){
+    return fieldNames.get(index);
+  }
 
-   public int findIndexOfField(String fieldName){
-     return fieldNames.indexOf(fieldName);
-   }
+  /**
+   * Find the index of a given field.
+   */
+  public int findIndexOfField(String fieldName){
+    return fieldNames.indexOf(fieldName);
+  }
 
+  /**
+   * Return the count of fields.
+   */
   public int getFieldCount(){
     return fieldNames.size();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index 123e6a0..625de2c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -79,8 +80,10 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
       for (String pc : paraClassName) {
         paraClass.add(Class.forName(pc));
       }
-      udfIns = Class.forName(className).newInstance();
       method = Class.forName(className).getMethod(methodName, 
paraClass.toArray(new Class<?>[] {}));
+      if (!Modifier.isStatic(method.getModifiers())) {
+        udfIns = Class.forName(className).newInstance();
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

Reply via email to