This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 439ac0d  [BEAM-5092] Prevent hash-lookup of schema on every record 
(#6268)
439ac0d is described below

commit 439ac0dcd13fbfcd77ae9ecd584f0e7e260fb778
Author: reuvenlax <re...@google.com>
AuthorDate: Fri Aug 24 18:16:12 2018 -0700

    [BEAM-5092] Prevent hash-lookup of schema on every record (#6268)
    
    * The hash lookup of the schema on every record is slowing down 
SchemaCoder. Better cache the getter list so this isn't necessary.
    
    * Properly handle nested schemas.
    
    * Make threadsafe.
    
    * Cache setter factory as well.
    
    * Fix unneeded validation.
    
    * Fix FindBugs warning.
    
    * Skip scanNullFields when not needed.
    
    * Switch coders to variable-length ones.
---
 .../java/org/apache/beam/sdk/coders/RowCoder.java  |   4 +-
 .../apache/beam/sdk/coders/RowCoderGenerator.java  |  26 +++--
 .../sdk/schemas/GetterBasedSchemaProvider.java     | 111 ++++++++++++++++-----
 .../main/java/org/apache/beam/sdk/values/Row.java  |   5 +-
 4 files changed, 112 insertions(+), 34 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 47ced37..7e677a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -43,8 +43,8 @@ public class RowCoder extends CustomCoder<Row> {
           .put(TypeName.BYTE, ByteCoder.of())
           .put(TypeName.BYTES, ByteArrayCoder.of())
           .put(TypeName.INT16, BigEndianShortCoder.of())
-          .put(TypeName.INT32, BigEndianIntegerCoder.of())
-          .put(TypeName.INT64, BigEndianLongCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
           .put(TypeName.DECIMAL, BigDecimalCoder.of())
           .put(TypeName.FLOAT, FloatCoder.of())
           .put(TypeName.DOUBLE, DoubleCoder.of())
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index b8fcb96..73430be 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -51,6 +51,7 @@ import 
net.bytebuddy.implementation.bytecode.member.MethodReturn;
 import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
 import net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
 
@@ -147,9 +148,12 @@ public abstract class RowCoderGenerator {
 
   private static DynamicType.Builder<Coder> implementMethods(
       Schema schema, DynamicType.Builder<Coder> builder) {
+    boolean hasNullableFields = 
schema.getFields().stream().anyMatch(Field::getNullable);
     return builder
         .defineMethod("getSchema", Schema.class, Visibility.PRIVATE, 
Ownership.STATIC)
         .intercept(FixedValue.reference(schema))
+        .defineMethod("hasNullableFields", boolean.class, Visibility.PRIVATE, 
Ownership.STATIC)
+        .intercept(FixedValue.reference(hasNullableFields))
         .method(ElementMatchers.named("encode"))
         .intercept(new EncodeInstruction())
         .method(ElementMatchers.named("decode"))
@@ -176,6 +180,13 @@ public abstract class RowCoderGenerator {
                 MethodVariableAccess.REFERENCE.loadFrom(1),
                 // OutputStream.
                 MethodVariableAccess.REFERENCE.loadFrom(2),
+                // hasNullableFields
+                MethodInvocation.invoke(
+                    implementationContext
+                        .getInstrumentedType()
+                        .getDeclaredMethods()
+                        .filter(ElementMatchers.named("hasNullableFields"))
+                        .getOnly()),
                 // Call EncodeInstruction.encodeDelegate
                 MethodInvocation.invoke(
                     LOADED_TYPE
@@ -197,9 +208,10 @@ public abstract class RowCoderGenerator {
     // The encode method of the generated Coder delegates to this method to 
evaluate all of the
     // per-field Coders.
     @SuppressWarnings("unchecked")
-    static void encodeDelegate(Coder[] coders, Row value, OutputStream 
outputStream)
+    static void encodeDelegate(
+        Coder[] coders, Row value, OutputStream outputStream, boolean 
hasNullableFields)
         throws IOException {
-      NULL_LIST_CODER.encode(scanNullFields(value), outputStream);
+      NULL_LIST_CODER.encode(scanNullFields(value, hasNullableFields), 
outputStream);
       for (int idx = 0; idx < value.getFieldCount(); ++idx) {
         Object fieldValue = value.getValue(idx);
         if (value.getValue(idx) != null) {
@@ -210,11 +222,13 @@ public abstract class RowCoderGenerator {
 
     // Figure out which fields of the Row are null, and returns a BitSet. This 
allows us to save
     // on encoding each null field separately.
-    private static BitSet scanNullFields(Row row) {
+    private static BitSet scanNullFields(Row row, boolean hasNullableFields) {
       BitSet nullFields = new BitSet(row.getFieldCount());
-      for (int idx = 0; idx < row.getFieldCount(); ++idx) {
-        if (row.getValue(idx) == null) {
-          nullFields.set(idx);
+      if (hasNullableFields) {
+        for (int idx = 0; idx < row.getFieldCount(); ++idx) {
+          if (row.getValue(idx) == null) {
+            nullFields.set(idx);
+          }
         }
       }
       return nullFields;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index 7171878..4879fa9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -54,27 +55,74 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
     // why is that Java reflection does not guarantee the order in which it 
returns fields and
     // methods, and these schemas are often based on reflective analysis of 
classes. Therefore it's
     // important to capture the schema once here, so all invocations of the 
toRowFunction see the
-    // same version of the schema. If schemaFor were to be called inside the 
function, different
+    // same version of the schema. If schemaFor were to be called inside the 
lambda below, different
     // workers would see different versions of the schema.
     Schema schema = schemaFor(typeDescriptor);
-    return o -> 
Row.withSchema(schema).withFieldValueGetters(fieldValueGetterFactory(), 
o).build();
+
+    // Since we know that this factory is always called from inside the lambda 
with the same schema,
+    // return a caching factory that caches the first value seen for each 
class. This prevents
+    // having to lookup the getter list each time createGetters is called.
+    FieldValueGetterFactory getterFactory =
+        new FieldValueGetterFactory() {
+          @Nullable
+          private transient ConcurrentHashMap<Class, List<FieldValueGetter>> 
gettersMap = null;
+
+          private final FieldValueGetterFactory innerFactory = 
fieldValueGetterFactory();
+
+          @Override
+          public List<FieldValueGetter> createGetters(Class<?> targetClass, 
Schema schema) {
+            if (gettersMap == null) {
+              gettersMap = new ConcurrentHashMap<>();
+            }
+            List<FieldValueGetter> getters = gettersMap.get(targetClass);
+            if (getters != null) {
+              return getters;
+            }
+            getters = innerFactory.createGetters(targetClass, schema);
+            gettersMap.put(targetClass, getters);
+            return getters;
+          }
+        };
+    return o -> Row.withSchema(schema).withFieldValueGetters(getterFactory, 
o).build();
   }
 
   @Override
   @SuppressWarnings("unchecked")
   public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> 
typeDescriptor) {
+    FieldValueSetterFactory setterFactory =
+        new FieldValueSetterFactory() {
+          @Nullable
+          private volatile ConcurrentHashMap<Class, List<FieldValueSetter>> 
settersMap = null;
+
+          private final FieldValueSetterFactory innerFactory = 
fieldValueSetterFactory();
+
+          @Override
+          public List<FieldValueSetter> createSetters(Class<?> targetClass, 
Schema schema) {
+            if (settersMap == null) {
+              settersMap = new ConcurrentHashMap<>();
+            }
+            List<FieldValueSetter> setters = settersMap.get(targetClass);
+            if (setters != null) {
+              return setters;
+            }
+            setters = innerFactory.createSetters(targetClass, schema);
+            settersMap.put(targetClass, setters);
+            return setters;
+          }
+        };
+
     return r -> {
       if (r instanceof RowWithGetters) {
         // Efficient path: simply extract the underlying POJO instead of 
creating a new one.
         return (T) ((RowWithGetters) r).getGetterTarget();
       } else {
         // Use the setters to copy values from the Row to a new instance of 
the class.
-        return fromRow(r, (Class<T>) typeDescriptor.getType());
+        return fromRow(r, (Class<T>) typeDescriptor.getType(), setterFactory);
       }
     };
   }
 
-  private <T> T fromRow(Row row, Class<T> clazz) {
+  private <T> T fromRow(Row row, Class<T> clazz, FieldValueSetterFactory 
setterFactory) {
     T object;
     try {
       object = clazz.getDeclaredConstructor().newInstance();
@@ -86,7 +134,7 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
     }
 
     Schema schema = row.getSchema();
-    List<FieldValueSetter> setters = 
fieldValueSetterFactory().createSetters(clazz, schema);
+    List<FieldValueSetter> setters = setterFactory.createSetters(clazz, 
schema);
     checkState(
         setters.size() == row.getFieldCount(),
         "Did not have a matching number of setters and fields.");
@@ -96,15 +144,6 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
     for (int i = 0; i < row.getFieldCount(); ++i) {
       FieldType type = schema.getField(i).getType();
       FieldValueSetter setter = setters.get(i);
-      if (setter == null) {
-        throw new RuntimeException(
-            "NULL SETTER FOR "
-                + clazz.getSimpleName()
-                + " field name "
-                + schema.getField(i).getName()
-                + " schema "
-                + schema);
-      }
       setter.set(
           object,
           fromValue(
@@ -113,7 +152,8 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
               setter.type(),
               setter.elementType(),
               setter.mapKeyType(),
-              setter.mapValueType()));
+              setter.mapValueType(),
+              setterFactory));
     }
     return object;
   }
@@ -121,39 +161,62 @@ public abstract class GetterBasedSchemaProvider 
implements SchemaProvider {
   @SuppressWarnings("unchecked")
   @Nullable
   private <T> T fromValue(
-      FieldType type, T value, Type fieldType, Type elemenentType, Type 
keyType, Type valueType) {
+      FieldType type,
+      T value,
+      Type fieldType,
+      Type elemenentType,
+      Type keyType,
+      Type valueType,
+      FieldValueSetterFactory setterFactory) {
     if (value == null) {
       return null;
     }
     if (TypeName.ROW.equals(type.getTypeName())) {
-      return (T) fromRow((Row) value, (Class) fieldType);
+      return (T) fromRow((Row) value, (Class) fieldType, setterFactory);
     } else if (TypeName.ARRAY.equals(type.getTypeName())) {
-      return (T) fromListValue(type.getCollectionElementType(), (List) value, 
elemenentType);
+      return (T)
+          fromListValue(
+              type.getCollectionElementType(), (List) value, elemenentType, 
setterFactory);
     } else if (TypeName.MAP.equals(type.getTypeName())) {
       return (T)
           fromMapValue(
-              type.getMapKeyType(), type.getMapValueType(), (Map) value, 
keyType, valueType);
+              type.getMapKeyType(),
+              type.getMapValueType(),
+              (Map) value,
+              keyType,
+              valueType,
+              setterFactory);
     } else {
       return value;
     }
   }
 
   @SuppressWarnings("unchecked")
-  private <T> List fromListValue(FieldType elementType, List<T> rowList, Type 
elementClass) {
+  private <T> List fromListValue(
+      FieldType elementType,
+      List<T> rowList,
+      Type elementClass,
+      FieldValueSetterFactory setterFactory) {
     List list = Lists.newArrayList();
     for (T element : rowList) {
-      list.add(fromValue(elementType, element, elementClass, null, null, 
null));
+      list.add(fromValue(elementType, element, elementClass, null, null, null, 
setterFactory));
     }
     return list;
   }
 
   @SuppressWarnings("unchecked")
   private Map<?, ?> fromMapValue(
-      FieldType keyType, FieldType valueType, Map<?, ?> map, Type keyClass, 
Type valueClass) {
+      FieldType keyType,
+      FieldType valueType,
+      Map<?, ?> map,
+      Type keyClass,
+      Type valueClass,
+      FieldValueSetterFactory setterFactory) {
     Map newMap = Maps.newHashMap();
     for (Map.Entry<?, ?> entry : map.entrySet()) {
-      Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, 
null);
-      Object value = fromValue(valueType, entry.getValue(), valueClass, null, 
null, null);
+      Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, 
null, setterFactory);
+      Object value =
+          fromValue(valueType, entry.getValue(), valueClass, null, null, null, 
setterFactory);
       newMap.put(key, value);
     }
     return newMap;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index d745df7..2c12a7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -378,7 +378,8 @@ public abstract class Row implements Serializable {
 
     public Builder attachValues(List<Object> values) {
       this.attached = true;
-      return addValues(values);
+      this.values = values;
+      return this;
     }
 
     public Builder withFieldValueGetters(
@@ -564,7 +565,7 @@ public abstract class Row implements Serializable {
       if (!this.values.isEmpty()) {
         List<Object> storageValues = attached ? this.values : verify(schema, 
this.values);
         checkState(getterTarget == null, "withGetterTarget requires getters.");
-        return new RowWithStorage(schema, verify(schema, storageValues));
+        return new RowWithStorage(schema, storageValues);
       } else if (fieldValueGetterFactory != null) {
         checkState(getterTarget != null, "getters require withGetterTarget.");
         return new RowWithGetters(schema, fieldValueGetterFactory, 
getterTarget);

Reply via email to