Repository: beam Updated Branches: refs/heads/DSL_SQL 2a1377e1c -> f37a7a19c
update JavaDoc for BeamRecord, BeamRecordType. Also only create new UDF class instances for SerializableFunction UDFs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd27036a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd27036a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd27036a Branch: refs/heads/DSL_SQL Commit: cd27036a1c537a9059f13bbcdaec0264481ebd0b Parents: 2a1377e Author: mingmxu <ming...@ebay.com> Authored: Wed Aug 9 13:30:16 2017 -0700 Committer: Tyler Akidau <taki...@apache.org> Committed: Thu Aug 10 13:58:59 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/values/BeamRecord.java | 118 ++++++++++++++++++- .../apache/beam/sdk/values/BeamRecordType.java | 60 ++++++---- .../operator/BeamSqlUdfExpression.java | 5 +- 3 files changed, 159 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index fa3b574..fd26f46 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -25,11 +25,17 @@ import java.util.Date; import java.util.GregorianCalendar; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.BeamRecordCoder; /** - * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with - * {@link BeamRecordType}, represents one element in a - * {@link org.apache.beam.sdk.values.PCollection}. + * {@link BeamRecord} is an immutable tuple-like type to represent one element in a + * {@link PCollection}. The fields are described with a {@link BeamRecordType}. + * + * <p>By default, {@link BeamRecordType} only contains the name for each field. It + * can be extended to support more sophisticated validation by overwriting + * {@link BeamRecordType#validateValueType(int, Object)}. + * + * <p>A Coder {@link BeamRecordCoder} is provided, which wraps the Coder for each data field. */ @Experimental public class BeamRecord implements Serializable { @@ -63,6 +69,9 @@ public class BeamRecord implements Serializable { } } + /** + * see {@link #BeamRecord(BeamRecordType, List)}. + */ public BeamRecord(BeamRecordType dataType, Object... rawdataValues) { this(dataType, Arrays.asList(rawdataValues)); } @@ -72,110 +81,213 @@ public class BeamRecord implements Serializable { dataValues.set(index, fieldValue); } + /** + * Get value by field name. + */ public Object getFieldValue(String fieldName) { return getFieldValue(dataType.getFieldNames().indexOf(fieldName)); } + /** + * Get a {@link Byte} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Byte getByte(String fieldName) { return (Byte) getFieldValue(fieldName); } + /** + * Get a {@link Short} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Short getShort(String fieldName) { return (Short) getFieldValue(fieldName); } + /** + * Get a {@link Integer} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Integer getInteger(String fieldName) { return (Integer) getFieldValue(fieldName); } + /** + * Get a {@link Float} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Float getFloat(String fieldName) { return (Float) getFieldValue(fieldName); } + /** + * Get a {@link Double} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Double getDouble(String fieldName) { return (Double) getFieldValue(fieldName); } + /** + * Get a {@link Long} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Long getLong(String fieldName) { return (Long) getFieldValue(fieldName); } + /** + * Get a {@link String} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public String getString(String fieldName) { return (String) getFieldValue(fieldName); } + /** + * Get a {@link Date} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Date getDate(String fieldName) { return (Date) getFieldValue(fieldName); } + /** + * Get a {@link GregorianCalendar} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public GregorianCalendar getGregorianCalendar(String fieldName) { return (GregorianCalendar) getFieldValue(fieldName); } + /** + * Get a {@link BigDecimal} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public BigDecimal getBigDecimal(String fieldName) { return (BigDecimal) getFieldValue(fieldName); } + /** + * Get a {@link Boolean} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Boolean getBoolean(String fieldName) { return (Boolean) getFieldValue(fieldName); } + /** + * Get value by field index. + */ public Object getFieldValue(int fieldIdx) { return dataValues.get(fieldIdx); } + /** + * Get a {@link Byte} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Byte getByte(int idx) { return (Byte) getFieldValue(idx); } + /** + * Get a {@link Short} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Short getShort(int idx) { return (Short) getFieldValue(idx); } + /** + * Get a {@link Integer} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Integer getInteger(int idx) { return (Integer) getFieldValue(idx); } + /** + * Get a {@link Float} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Float getFloat(int idx) { return (Float) getFieldValue(idx); } + /** + * Get a {@link Double} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Double getDouble(int idx) { return (Double) getFieldValue(idx); } + /** + * Get a {@link Long} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Long getLong(int idx) { return (Long) getFieldValue(idx); } + /** + * Get a {@link String} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public String getString(int idx) { return (String) getFieldValue(idx); } + /** + * Get a {@link Date} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Date getDate(int idx) { return (Date) getFieldValue(idx); } + /** + * Get a {@link GregorianCalendar} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public GregorianCalendar getGregorianCalendar(int idx) { return (GregorianCalendar) getFieldValue(idx); } + /** + * Get a {@link BigDecimal} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public BigDecimal getBigDecimal(int idx) { return (BigDecimal) getFieldValue(idx); } + /** + * Get a {@link Boolean} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Boolean getBoolean(int idx) { return (Boolean) getFieldValue(idx); } + /** + * Return the size of data fields. + */ public int getFieldCount() { return dataValues.size(); } + /** + * Return the list of data values. + */ public List<Object> getDataValues() { return dataValues; } + /** + * Return {@link BeamRecordType} which describes the fields. + */ public BeamRecordType getDataType() { return dataType; } http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java index 29cc80d..620361c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java @@ -25,14 +25,22 @@ import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.coders.Coder; /** - * The default type provider used in {@link BeamRecord}. + * {@link BeamRecordType} describes the fields in {@link BeamRecord}, extra checking can be added + * by overwriting {@link BeamRecordType#validateValueType(int, Object)}. */ @Experimental public class BeamRecordType implements Serializable{ private List<String> fieldNames; private List<Coder> fieldCoders; + /** + * Create a {@link BeamRecordType} with a name and Coder for each field. + */ public BeamRecordType(List<String> fieldNames, List<Coder> fieldCoders) { + if (fieldNames.size() != fieldCoders.size()) { + throw new IllegalStateException( + "the size of fieldNames and fieldCoders need to be the same."); + } this.fieldNames = fieldNames; this.fieldCoders = fieldCoders; } @@ -41,30 +49,42 @@ public class BeamRecordType implements Serializable{ * Validate input fieldValue for a field. * @throws IllegalArgumentException throw exception when the validation fails. */ - public void validateValueType(int index, Object fieldValue) - throws IllegalArgumentException{ - //do nothing by default. - } + public void validateValueType(int index, Object fieldValue) + throws IllegalArgumentException{ + //do nothing by default. + } - /** - * Get the coder for {@link BeamRecordCoder}. - */ - public BeamRecordCoder getRecordCoder(){ - return BeamRecordCoder.of(this, fieldCoders); - } + /** + * Return the coder for {@link BeamRecord}, which wraps {@link #fieldCoders} for each field. + */ + public BeamRecordCoder getRecordCoder(){ + return BeamRecordCoder.of(this, fieldCoders); + } - public List<String> getFieldNames(){ - return ImmutableList.copyOf(fieldNames); - } + /** + * Returns an immutable list of field names. + */ + public List<String> getFieldNames(){ + return ImmutableList.copyOf(fieldNames); + } - public String getFieldNameByIndex(int index){ - return fieldNames.get(index); - } + /** + * Return the name of field by index. + */ + public String getFieldNameByIndex(int index){ + return fieldNames.get(index); + } - public int findIndexOfField(String fieldName){ - return fieldNames.indexOf(fieldName); - } + /** + * Find the index of a given field. + */ + public int findIndexOfField(String fieldName){ + return fieldNames.indexOf(fieldName); + } + /** + * Return the count of fields. + */ public int getFieldCount(){ return fieldNames.size(); } http://git-wip-us.apache.org/repos/asf/beam/blob/cd27036a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java index 123e6a0..625de2c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -79,8 +80,10 @@ public class BeamSqlUdfExpression extends BeamSqlExpression { for (String pc : paraClassName) { paraClass.add(Class.forName(pc)); } - udfIns = Class.forName(className).newInstance(); method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {})); + if (!Modifier.isStatic(method.getModifiers())) { + udfIns = Class.forName(className).newInstance(); + } } catch (Exception e) { throw new RuntimeException(e); }