Repository: incubator-beam
Updated Branches:
  refs/heads/master 6d686288e -> 7f562cc10


"Intern" schemas and schema strings to prevent out of memory issues when 
dealing with many many files in Avro sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ebc62025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ebc62025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ebc62025

Branch: refs/heads/master
Commit: ebc62025d26af5b5b3e2568e6e0e9e9df9d72546
Parents: 6d68628
Author: Luke Cwik <lc...@google.com>
Authored: Fri Oct 14 11:46:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 18 12:50:56 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 97 +++++++++++++++++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java  | 43 +++++++++
 2 files changed, 125 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ebc62025/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index f7ce3c2..aaf72ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -21,10 +21,13 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InvalidObjectException;
+import java.io.ObjectStreamException;
 import java.io.PushbackInputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
@@ -32,6 +35,8 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
+import java.util.WeakHashMap;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 import javax.annotation.concurrent.GuardedBy;
@@ -164,7 +169,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * to read records of the given type from a file pattern.
    */
   public static <T> Read.Bounded<T> readFromFileWithClass(String filePattern, 
Class<T> clazz) {
-    return Read.from(new AvroSource<T>(filePattern, DEFAULT_MIN_BUNDLE_SIZE,
+    return Read.from(new AvroSource<>(filePattern, DEFAULT_MIN_BUNDLE_SIZE,
         ReflectData.get().getSchema(clazz).toString(), clazz, null, null));
   }
 
@@ -218,14 +223,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * <p>Does not modify this object.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<T>(
+    return new AvroSource<>(
         getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, 
syncMarker);
   }
 
   private AvroSource(String fileNameOrPattern, long minBundleSize, String 
schema, Class<T> type,
       String codec, byte[] syncMarker) {
     super(fileNameOrPattern, minBundleSize);
-    this.readSchemaString = schema;
+    this.readSchemaString = internSchemaString(schema);
     this.codec = codec;
     this.syncMarker = syncMarker;
     this.type = type;
@@ -235,11 +240,11 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   private AvroSource(String fileName, long minBundleSize, long startOffset, 
long endOffset,
       String schema, Class<T> type, String codec, byte[] syncMarker, String 
fileSchema) {
     super(fileName, minBundleSize, startOffset, endOffset);
-    this.readSchemaString = schema;
+    this.readSchemaString = internSchemaString(schema);
     this.codec = codec;
     this.syncMarker = syncMarker;
     this.type = type;
-    this.fileSchemaString = fileSchema;
+    this.fileSchemaString = internSchemaString(fileSchema);
   }
 
   @Override
@@ -277,13 +282,18 @@ public class AvroSource<T> extends BlockBasedSource<T> {
         readSchemaString = metadata.getSchemaString();
       }
     }
-    return new AvroSource<T>(fileName, getMinBundleSize(), start, end, 
readSchemaString, type,
+    // Note that if the fileSchemaString is equivalent to the 
readSchemaString, "intern"ing
+    // the string will occur within the constructor and return the same 
reference as the
+    // readSchemaString. This allows for Java to have an efficient 
serialization since it
+    // will only encode the schema once while just storing pointers to the 
encoded version
+    // within this source.
+    return new AvroSource<>(fileName, getMinBundleSize(), start, end, 
readSchemaString, type,
         codec, syncMarker, fileSchemaString);
   }
 
   @Override
   protected BlockBasedReader<T> createSingleFileReader(PipelineOptions 
options) {
-    return new AvroReader<T>(this);
+    return new AvroReader<>(this);
   }
 
   @Override
@@ -294,8 +304,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   @Override
   public AvroCoder<T> getDefaultOutputCoder() {
     if (coder == null) {
-      Schema.Parser parser = new Schema.Parser();
-      coder = AvroCoder.of(type, parser.parse(readSchemaString));
+      coder = AvroCoder.of(type, internOrParseSchemaString(readSchemaString));
     }
     return coder;
   }
@@ -304,28 +313,28 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return readSchemaString;
   }
 
-  private Schema getReadSchema() {
+  @VisibleForTesting
+  Schema getReadSchema() {
     if (readSchemaString == null) {
       return null;
     }
 
     // If the schema has not been parsed, parse it.
     if (readSchema == null) {
-      Schema.Parser parser = new Schema.Parser();
-      readSchema = parser.parse(readSchemaString);
+      readSchema = internOrParseSchemaString(readSchemaString);
     }
     return readSchema;
   }
 
-  private Schema getFileSchema() {
+  @VisibleForTesting
+  Schema getFileSchema() {
     if (fileSchemaString == null) {
       return null;
     }
 
     // If the schema has not been parsed, parse it.
     if (fileSchema == null) {
-      Schema.Parser parser = new Schema.Parser();
-      fileSchema = parser.parse(fileSchemaString);
+      fileSchema = internOrParseSchemaString(fileSchemaString);
     }
     return fileSchema;
   }
@@ -350,6 +359,64 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     }
   }
 
+  // A logical reference cache used to store schemas and schema strings to 
allow us to
+  // "intern" values and reduce the number of copies of equivalent objects.
+  private static final Map<String, Schema> schemaLogicalReferenceCache = new 
WeakHashMap<>();
+  private static final Map<String, String> schemaStringLogicalReferenceCache = 
new WeakHashMap<>();
+
+  // We avoid String.intern() because depending on the JVM, these may be added 
to the PermGenSpace
+  // which we want to avoid otherwise we could run out of PermGenSpace.
+  private static synchronized String internSchemaString(String schema) {
+    String internSchema = schemaStringLogicalReferenceCache.get(schema);
+    if (internSchema != null) {
+      return internSchema;
+    }
+    schemaStringLogicalReferenceCache.put(schema, schema);
+    return schema;
+  }
+
+  private static synchronized Schema internOrParseSchemaString(String 
schemaString) {
+    Schema schema = schemaLogicalReferenceCache.get(schemaString);
+    if (schema != null) {
+      return schema;
+    }
+    Schema.Parser parser = new Schema.Parser();
+    schema = parser.parse(schemaString);
+    schemaLogicalReferenceCache.put(schemaString, schema);
+    return schema;
+  }
+
+  // Reading the object from Java serialization typically does not go through 
the constructor,
+  // we use readResolve to replace the constructed instance with one which 
uses the constructor
+  // allowing us to intern any schemas.
+  @SuppressWarnings("unused")
+  private Object readResolve() throws ObjectStreamException {
+    switch (getMode()) {
+      case SINGLE_FILE_OR_SUBRANGE:
+        return new AvroSource<>(
+            getFileOrPatternSpec(),
+            getMinBundleSize(),
+            getStartOffset(),
+            getEndOffset(),
+            readSchemaString,
+            type,
+            codec,
+            syncMarker,
+            fileSchemaString);
+      case FILEPATTERN:
+        return new AvroSource<>(
+            getFileOrPatternSpec(),
+            getMinBundleSize(),
+            readSchemaString,
+            type,
+            codec,
+            syncMarker);
+        default:
+          throw new InvalidObjectException(
+              String.format("Unknown mode %s for AvroSource %s", getMode(), 
this));
+    }
+  }
+
   /**
    * A {@link BlockBasedSource.Block} of Avro records.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ebc62025/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index c1b532f..fb7b27d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -55,6 +57,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.AvroUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -424,6 +428,45 @@ public class AvroSourceTest {
     assertThat(actual, containsInAnyOrder(expected.toArray()));
   }
 
+  @Test
+  public void testSchemaStringIsInterned() throws Exception {
+    List<Bird> birds = createRandomRecords(100);
+    String filename = generateTestFile("tmp.avro", birds, 
SyncBehavior.SYNC_DEFAULT, 0,
+        AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
+    String schemaA = 
AvroUtils.readMetadataFromFile(filename).getSchemaString();
+    String schemaB = 
AvroUtils.readMetadataFromFile(filename).getSchemaString();
+    assertNotSame(schemaA, schemaB);
+
+    AvroSource<GenericRecord> sourceA = 
AvroSource.from(filename).withSchema(schemaA);
+    AvroSource<GenericRecord> sourceB = 
AvroSource.from(filename).withSchema(schemaB);
+    assertSame(sourceA.getSchema(), sourceB.getSchema());
+
+    // Ensure that deserialization still goes through interning
+    AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB);
+    assertSame(sourceA.getSchema(), sourceC.getSchema());
+  }
+
+  @Test
+  public void testSchemaIsInterned() throws Exception {
+    List<Bird> birds = createRandomRecords(100);
+    String filename = generateTestFile("tmp.avro", birds, 
SyncBehavior.SYNC_DEFAULT, 0,
+        AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
+    String schemaA = 
AvroUtils.readMetadataFromFile(filename).getSchemaString();
+    String schemaB = 
AvroUtils.readMetadataFromFile(filename).getSchemaString();
+    assertNotSame(schemaA, schemaB);
+
+    AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) 
AvroSource.from(filename)
+        .withSchema(schemaA).createForSubrangeOfFile(filename, 0L, 0L);
+    AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) 
AvroSource.from(filename)
+        .withSchema(schemaB).createForSubrangeOfFile(filename, 0L, 0L);
+    assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
+    assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
+    assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());
+
+    // Schemas are transient and not serialized thus we don't need to worry 
about interning
+    // after deserialization.
+  }
+
   private void assertEqualsWithGeneric(List<Bird> expected, 
List<GenericRecord> actual) {
     assertEquals(expected.size(), actual.size());
     for (int i = 0; i < expected.size(); i++) {

Reply via email to