Repository: incubator-beam Updated Branches: refs/heads/master 6d686288e -> 7f562cc10
"Intern" schemas and schema strings to prevent out of memory issues when dealing with many many files in Avro sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ebc62025 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ebc62025 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ebc62025 Branch: refs/heads/master Commit: ebc62025d26af5b5b3e2568e6e0e9e9df9d72546 Parents: 6d68628 Author: Luke Cwik <lc...@google.com> Authored: Fri Oct 14 11:46:37 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Oct 18 12:50:56 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroSource.java | 97 +++++++++++++++++--- .../org/apache/beam/sdk/io/AvroSourceTest.java | 43 +++++++++ 2 files changed, 125 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ebc62025/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index f7ce3c2..aaf72ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -21,10 +21,13 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.InvalidObjectException; +import java.io.ObjectStreamException; import java.io.PushbackInputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -32,6 +35,8 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.util.Arrays; import java.util.Collection; +import java.util.Map; +import java.util.WeakHashMap; import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import javax.annotation.concurrent.GuardedBy; @@ -164,7 +169,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { * to read records of the given type from a file pattern. */ public static <T> Read.Bounded<T> readFromFileWithClass(String filePattern, Class<T> clazz) { - return Read.from(new AvroSource<T>(filePattern, DEFAULT_MIN_BUNDLE_SIZE, + return Read.from(new AvroSource<>(filePattern, DEFAULT_MIN_BUNDLE_SIZE, ReflectData.get().getSchema(clazz).toString(), clazz, null, null)); } @@ -218,14 +223,14 @@ public class AvroSource<T> extends BlockBasedSource<T> { * <p>Does not modify this object. */ public AvroSource<T> withMinBundleSize(long minBundleSize) { - return new AvroSource<T>( + return new AvroSource<>( getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, syncMarker); } private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type, String codec, byte[] syncMarker) { super(fileNameOrPattern, minBundleSize); - this.readSchemaString = schema; + this.readSchemaString = internSchemaString(schema); this.codec = codec; this.syncMarker = syncMarker; this.type = type; @@ -235,11 +240,11 @@ public class AvroSource<T> extends BlockBasedSource<T> { private AvroSource(String fileName, long minBundleSize, long startOffset, long endOffset, String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) { super(fileName, minBundleSize, startOffset, endOffset); - this.readSchemaString = schema; + this.readSchemaString = internSchemaString(schema); this.codec = codec; this.syncMarker = syncMarker; this.type = type; - this.fileSchemaString = fileSchema; + this.fileSchemaString = internSchemaString(fileSchema); } @Override @@ -277,13 +282,18 @@ public class AvroSource<T> extends BlockBasedSource<T> { readSchemaString = metadata.getSchemaString(); } } - return new AvroSource<T>(fileName, getMinBundleSize(), start, end, readSchemaString, type, + // Note that if the fileSchemaString is equivalent to the readSchemaString, "intern"ing + // the string will occur within the constructor and return the same reference as the + // readSchemaString. This allows for Java to have an efficient serialization since it + // will only encode the schema once while just storing pointers to the encoded version + // within this source. + return new AvroSource<>(fileName, getMinBundleSize(), start, end, readSchemaString, type, codec, syncMarker, fileSchemaString); } @Override protected BlockBasedReader<T> createSingleFileReader(PipelineOptions options) { - return new AvroReader<T>(this); + return new AvroReader<>(this); } @Override @@ -294,8 +304,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { @Override public AvroCoder<T> getDefaultOutputCoder() { if (coder == null) { - Schema.Parser parser = new Schema.Parser(); - coder = AvroCoder.of(type, parser.parse(readSchemaString)); + coder = AvroCoder.of(type, internOrParseSchemaString(readSchemaString)); } return coder; } @@ -304,28 +313,28 @@ public class AvroSource<T> extends BlockBasedSource<T> { return readSchemaString; } - private Schema getReadSchema() { + @VisibleForTesting + Schema getReadSchema() { if (readSchemaString == null) { return null; } // If the schema has not been parsed, parse it. if (readSchema == null) { - Schema.Parser parser = new Schema.Parser(); - readSchema = parser.parse(readSchemaString); + readSchema = internOrParseSchemaString(readSchemaString); } return readSchema; } - private Schema getFileSchema() { + @VisibleForTesting + Schema getFileSchema() { if (fileSchemaString == null) { return null; } // If the schema has not been parsed, parse it. if (fileSchema == null) { - Schema.Parser parser = new Schema.Parser(); - fileSchema = parser.parse(fileSchemaString); + fileSchema = internOrParseSchemaString(fileSchemaString); } return fileSchema; } @@ -350,6 +359,64 @@ public class AvroSource<T> extends BlockBasedSource<T> { } } + // A logical reference cache used to store schemas and schema strings to allow us to + // "intern" values and reduce the number of copies of equivalent objects. + private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>(); + private static final Map<String, String> schemaStringLogicalReferenceCache = new WeakHashMap<>(); + + // We avoid String.intern() because depending on the JVM, these may be added to the PermGenSpace + // which we want to avoid otherwise we could run out of PermGenSpace. + private static synchronized String internSchemaString(String schema) { + String internSchema = schemaStringLogicalReferenceCache.get(schema); + if (internSchema != null) { + return internSchema; + } + schemaStringLogicalReferenceCache.put(schema, schema); + return schema; + } + + private static synchronized Schema internOrParseSchemaString(String schemaString) { + Schema schema = schemaLogicalReferenceCache.get(schemaString); + if (schema != null) { + return schema; + } + Schema.Parser parser = new Schema.Parser(); + schema = parser.parse(schemaString); + schemaLogicalReferenceCache.put(schemaString, schema); + return schema; + } + + // Reading the object from Java serialization typically does not go through the constructor, + // we use readResolve to replace the constructed instance with one which uses the constructor + // allowing us to intern any schemas. + @SuppressWarnings("unused") + private Object readResolve() throws ObjectStreamException { + switch (getMode()) { + case SINGLE_FILE_OR_SUBRANGE: + return new AvroSource<>( + getFileOrPatternSpec(), + getMinBundleSize(), + getStartOffset(), + getEndOffset(), + readSchemaString, + type, + codec, + syncMarker, + fileSchemaString); + case FILEPATTERN: + return new AvroSource<>( + getFileOrPatternSpec(), + getMinBundleSize(), + readSchemaString, + type, + codec, + syncMarker); + default: + throw new InvalidObjectException( + String.format("Unknown mode %s for AvroSource %s", getMode(), this)); + } + } + /** * A {@link BlockBasedSource.Block} of Avro records. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ebc62025/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index c1b532f..fb7b27d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -21,6 +21,8 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -55,6 +57,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.AvroUtils; +import org.apache.beam.sdk.util.SerializableUtils; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -424,6 +428,45 @@ public class AvroSourceTest { assertThat(actual, containsInAnyOrder(expected.toArray())); } + @Test + public void testSchemaStringIsInterned() throws Exception { + List<Bird> birds = createRandomRecords(100); + String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0, + AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC); + String schemaA = AvroUtils.readMetadataFromFile(filename).getSchemaString(); + String schemaB = AvroUtils.readMetadataFromFile(filename).getSchemaString(); + assertNotSame(schemaA, schemaB); + + AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA); + AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB); + assertSame(sourceA.getSchema(), sourceB.getSchema()); + + // Ensure that deserialization still goes through interning + AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB); + assertSame(sourceA.getSchema(), sourceC.getSchema()); + } + + @Test + public void testSchemaIsInterned() throws Exception { + List<Bird> birds = createRandomRecords(100); + String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0, + AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC); + String schemaA = AvroUtils.readMetadataFromFile(filename).getSchemaString(); + String schemaB = AvroUtils.readMetadataFromFile(filename).getSchemaString(); + assertNotSame(schemaA, schemaB); + + AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename) + .withSchema(schemaA).createForSubrangeOfFile(filename, 0L, 0L); + AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename) + .withSchema(schemaB).createForSubrangeOfFile(filename, 0L, 0L); + assertSame(sourceA.getReadSchema(), sourceA.getFileSchema()); + assertSame(sourceA.getReadSchema(), sourceB.getReadSchema()); + assertSame(sourceA.getReadSchema(), sourceB.getFileSchema()); + + // Schemas are transient and not serialized thus we don't need to worry about interning + // after deserialization. + } + private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) { assertEquals(expected.size(), actual.size()); for (int i = 0; i < expected.size(); i++) {