Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 79880b6ab -> 880531aa2


[rebased] remove nullFields in BeamRecord


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4871edbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4871edbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4871edbc

Branch: refs/heads/DSL_SQL
Commit: 4871edbc430a2e360ba59f10eeafbe2205d47ec1
Parents: 79880b6
Author: mingmxu <ming...@ebay.com>
Authored: Mon Aug 7 14:36:36 2017 -0700
Committer: mingmxu <ming...@ebay.com>
Committed: Mon Aug 7 14:36:36 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java | 15 ++++++++++++-
 .../org/apache/beam/sdk/values/BeamRecord.java  | 23 +++-----------------
 .../extensions/sql/impl/rel/BeamSortRel.java    |  4 ----
 .../sql/schema/BeamSqlRecordType.java           |  4 ++++
 4 files changed, 21 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index fe9c295..40b9f3f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -55,7 +55,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   @Override
   public void encode(BeamRecord value, OutputStream outStream)
       throws CoderException, IOException {
-    nullListCoder.encode(value.getNullFields(), outStream);
+    nullListCoder.encode(scanNullFields(value), outStream);
     for (int idx = 0; idx < value.size(); ++idx) {
       if (value.isNull(idx)) {
         continue;
@@ -81,6 +81,19 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> 
{
     return record;
   }
 
+  /**
+   * Scan {@link BeamRecord} to find fields with a NULL value.
+   */
+  private BitSet scanNullFields(BeamRecord record){
+    BitSet nullFields = new BitSet(record.size());
+    for (int idx = 0; idx < record.size(); ++idx) {
+      if (record.isNull(idx)) {
+        nullFields.set(idx);
+      }
+    }
+    return nullFields;
+  }
+
   @Override
   public void verifyDeterministic()
       throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {

http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 8d0aa42..6cbd11b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.values;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
@@ -34,17 +33,13 @@ import org.apache.beam.sdk.annotations.Experimental;
 @Experimental
 public class BeamRecord implements Serializable {
   private List<Object> dataValues;
-  //null values are indexed here, to handle properly in Coder.
-  private BitSet nullFields;
   private BeamRecordType dataType;
 
   public BeamRecord(BeamRecordType dataType) {
     this.dataType = dataType;
-    this.nullFields = new BitSet(dataType.size());
     this.dataValues = new ArrayList<>();
     for (int idx = 0; idx < dataType.size(); ++idx) {
       dataValues.add(null);
-      nullFields.set(idx);
     }
   }
 
@@ -60,12 +55,6 @@ public class BeamRecord implements Serializable {
   }
 
   public void addField(int index, Object fieldValue) {
-    if (fieldValue == null) {
-      return;
-    } else {
-      nullFields.clear(index);
-    }
-
     dataType.validateValueType(index, fieldValue);
     dataValues.set(index, fieldValue);
   }
@@ -182,21 +171,16 @@ public class BeamRecord implements Serializable {
     return dataType;
   }
 
-  public BitSet getNullFields() {
-    return nullFields;
-  }
-
   /**
    * is the specified field NULL?
    */
   public boolean isNull(int idx) {
-    return nullFields.get(idx);
+    return null ==  getFieldValue(idx);
   }
 
   @Override
   public String toString() {
-    return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + 
dataValues + ", dataType="
-        + dataType + "]";
+    return "BeamSqlRow [dataValues=" + dataValues + ", dataType=" + dataType + 
"]";
   }
 
   /**
@@ -227,7 +211,6 @@ public class BeamRecord implements Serializable {
   }
 
   @Override public int hashCode() {
-    return 31 * (31 * getDataType().hashCode() + getDataValues().hashCode())
-        + getNullFields().hashCode();
+    return 31 * getDataType().hashCode() + getDataValues().hashCode();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 0cbea5c..e98ead1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -229,8 +229,4 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
       return 0;
     }
   }
-
-  public static <T extends Comparable> int compare(T a, T b) {
-    return a.compareTo(b);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
index b295049..fe82834 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
@@ -133,6 +133,10 @@ public class BeamSqlRecordType extends BeamRecordType {
 
   @Override
   public void validateValueType(int index, Object fieldValue) throws 
IllegalArgumentException {
+    if (null == fieldValue) {// no need to do type check for NULL value
+      return;
+    }
+
     int fieldType = fieldsType.get(index);
     Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
     if (javaClazz == null) {

Reply via email to