Repository: incubator-beam Updated Branches: refs/heads/master 4f991fd82 -> 142229e37
Add compression codec for AvroIO.Write BEHAVIOUR CHANGE: prior to this change Avro output would not use compression. Starting from this commit, by default Avro output is compressed using deflate codec (level 6). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b22d003 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b22d003 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b22d003 Branch: refs/heads/master Commit: 2b22d003dabb7fddabdc8aaea872478fe13d407a Parents: 4f991fd Author: Rafal Wojdyla <r...@spotify.com> Authored: Mon Oct 3 14:02:59 2016 -0400 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Oct 12 09:35:23 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 158 ++++++++++++++++--- .../sdk/io/SerializableAvroCodecFactory.java | 112 +++++++++++++ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 ++++++++++++- .../io/SerializableAvroCodecFactoryTest.java | 100 ++++++++++++ 4 files changed, 458 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 267265d..eeb4bb7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -27,6 +27,7 @@ import java.nio.channels.WritableByteChannel; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; @@ -443,6 +444,13 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes Avro file(s) using specified codec. + */ + public static Bound<GenericRecord> withCodec(CodecFactory codec) { + return new Bound<>(GenericRecord.class).withCodec(codec); + } + + /** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). * @@ -450,6 +458,8 @@ public class AvroIO { */ public static class Bound<T> extends PTransform<PCollection<T>, PDone> { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + private static final SerializableAvroCodecFactory DEFAULT_CODEC = + new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); /** The filename to write to. */ @Nullable @@ -467,9 +477,23 @@ public class AvroIO { final Schema schema; /** An option to indicate if output validation is desired. Default is true. */ final boolean validate; + /** + * The codec used to encode the blocks in the Avro file. String value drawn from those in + * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html + */ + final SerializableAvroCodecFactory codec; Bound(Class<T> type) { - this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, type, null, true); + this( + null, + null, + "", + 0, + DEFAULT_SHARD_TEMPLATE, + type, + null, + true, + DEFAULT_CODEC); } Bound( @@ -480,7 +504,8 @@ public class AvroIO { String shardTemplate, Class<T> type, Schema schema, - boolean validate) { + boolean validate, + SerializableAvroCodecFactory codec) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -489,6 +514,7 @@ public class AvroIO { this.type = type; this.schema = schema; this.validate = validate; + this.codec = codec; } /** @@ -503,7 +529,15 @@ public class AvroIO { public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec); } /** @@ -517,7 +551,15 @@ public class AvroIO { public Bound<T> withSuffix(String filenameSuffix) { validateOutputComponent(filenameSuffix); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec); } /** @@ -537,7 +579,15 @@ public class AvroIO { public Bound<T> withNumShards(int numShards) { checkArgument(numShards >= 0); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec); } /** @@ -550,7 +600,15 @@ public class AvroIO { */ public Bound<T> withShardNameTemplate(String shardTemplate) { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec); } /** @@ -563,7 +621,16 @@ public class AvroIO { * <p>Does not modify this object. */ public Bound<T> withoutSharding() { - return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", type, schema, validate); + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + 1, + "", + type, + schema, + validate, + codec); } /** @@ -584,7 +651,8 @@ public class AvroIO { shardTemplate, type, ReflectData.get().getSchema(type), - validate); + validate, + codec); } /** @@ -603,7 +671,8 @@ public class AvroIO { shardTemplate, GenericRecord.class, schema, - validate); + validate, + codec); } /** @@ -629,7 +698,34 @@ public class AvroIO { */ public Bound<T> withoutValidation() { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, false); + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + false, + codec); + } + + /** + * Returns a new {@link PTransform} that's like this one but + * that writes to Avro file(s) compressed using specified codec. + * + * <p>Does not modify this object. + */ + public Bound<T> withCodec(CodecFactory codec) { + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + new SerializableAvroCodecFactory(codec)); } @Override @@ -645,7 +741,11 @@ public class AvroIO { org.apache.beam.sdk.io.Write.Bound<T> write = org.apache.beam.sdk.io.Write.to( new AvroSink<>( - filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))); + filenamePrefix, + filenameSuffix, + shardTemplate, + AvroCoder.of(type, schema), + codec)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -671,7 +771,10 @@ public class AvroIO { 0) .addIfNotDefault(DisplayData.item("validation", validate) .withLabel("Validation Enabled"), - true); + true) + .addIfNotDefault(DisplayData.item("codec", codec.toString()) + .withLabel("Avro Compression Codec"), + DEFAULT_CODEC.toString()); } /** @@ -713,6 +816,10 @@ public class AvroIO { public boolean needsValidation() { return validate; } + + public CodecFactory getCodec() { + return codec.getCodec(); + } } /** Disallow construction of utility class. */ @@ -741,17 +848,24 @@ public class AvroIO { @VisibleForTesting static class AvroSink<T> extends FileBasedSink<T> { private final AvroCoder<T> coder; + private final SerializableAvroCodecFactory codec; @VisibleForTesting AvroSink( - String baseOutputFilename, String extension, String fileNameTemplate, AvroCoder<T> coder) { + String baseOutputFilename, + String extension, + String fileNameTemplate, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec) { super(baseOutputFilename, extension, fileNameTemplate); this.coder = coder; + this.codec = codec; + } @Override public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new AvroWriteOperation<>(this, coder); + return new AvroWriteOperation<>(this, coder, codec); } /** @@ -760,15 +874,19 @@ public class AvroIO { */ private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { private final AvroCoder<T> coder; + private final SerializableAvroCodecFactory codec; - private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) { + private AvroWriteOperation(AvroSink<T> sink, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec) { super(sink); this.coder = coder; + this.codec = codec; } @Override public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new AvroWriter<>(this, coder); + return new AvroWriter<>(this, coder, codec); } } @@ -779,17 +897,21 @@ public class AvroIO { private static class AvroWriter<T> extends FileBasedWriter<T> { private final AvroCoder<T> coder; private DataFileWriter<T> dataFileWriter; + private SerializableAvroCodecFactory codec; - public AvroWriter(FileBasedWriteOperation<T> writeOperation, AvroCoder<T> coder) { + public AvroWriter(FileBasedWriteOperation<T> writeOperation, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec) { super(writeOperation); this.mimeType = MimeTypes.BINARY; this.coder = coder; + this.codec = codec; } @SuppressWarnings("deprecation") // uses internal test functionality. @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { - dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()); + dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec()); dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java new file mode 100644 index 0000000..ce52e99 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.avro.file.DataFileConstants.BZIP2_CODEC; +import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; +import static org.apache.avro.file.DataFileConstants.NULL_CODEC; +import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; +import static org.apache.avro.file.DataFileConstants.XZ_CODEC; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.avro.file.CodecFactory; + +/** + * A wrapper allows {@link org.apache.avro.file.CodecFactory}s to be serialized using Java's + * standard serialization mechanisms. + */ +class SerializableAvroCodecFactory implements Externalizable { + private static final long serialVersionUID = 7445324844109564303L; + private static final List<String> noOptAvroCodecs = Arrays.asList(NULL_CODEC, + SNAPPY_CODEC, + BZIP2_CODEC); + private static final Pattern deflatePattern = Pattern.compile(DEFLATE_CODEC + "-(?<level>-?\\d)"); + private static final Pattern xzPattern = Pattern.compile(XZ_CODEC + "-(?<level>\\d)"); + + private CodecFactory codecFactory; + + // For java.io.Serializable only + public SerializableAvroCodecFactory() {} + + public SerializableAvroCodecFactory(CodecFactory codecFactory) { + checkNotNull(codecFactory, "Codec can't be null"); + checkState(checkIsSupportedCodec(codecFactory), "%s is not supported", codecFactory); + this.codecFactory = codecFactory; + } + + private boolean checkIsSupportedCodec(CodecFactory codecFactory) { + final String codecStr = codecFactory.toString(); + return noOptAvroCodecs.contains(codecStr) + || deflatePattern.matcher(codecStr).matches() + || xzPattern.matcher(codecStr).matches(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(codecFactory.toString()); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + final String codecStr = in.readUTF(); + + switch (codecStr) { + case NULL_CODEC: + case SNAPPY_CODEC: + case BZIP2_CODEC: + codecFactory = CodecFactory.fromString(codecStr); + return; + } + + Matcher deflateMatcher = deflatePattern.matcher(codecStr); + if (deflateMatcher.find()) { + codecFactory = CodecFactory.deflateCodec( + Integer.parseInt(deflateMatcher.group("level"))); + return; + } + + Matcher xzMatcher = xzPattern.matcher(codecStr); + if (xzMatcher.find()) { + codecFactory = CodecFactory.xzCodec( + Integer.parseInt(xzMatcher.group("level"))); + return; + } + + throw new IllegalStateException(codecStr + " is not supported"); + } + + public CodecFactory getCodec() { + return codecFactory; + } + + @Override + public String toString() { + checkNotNull(codecFactory, "Inner CodecFactory is null, please use non default constructor"); + return codecFactory.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 81f05d7..1b1b1fa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; @@ -29,13 +30,17 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Set; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; import org.apache.beam.sdk.coders.AvroCoder; @@ -51,6 +56,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Ignore; @@ -146,6 +152,64 @@ public class AvroIOTest { p.run(); } + @Test + @SuppressWarnings("unchecked") + @Category(NeedsRunner.class) + public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable { + TestPipeline p = TestPipeline.create(); + List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), + new GenericClass(5, "bar")); + File outputFile = tmpFolder.newFile("output.avro"); + + p.apply(Create.of(values)) + .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .withoutSharding() + .withCodec(CodecFactory.deflateCodec(9)) + .withSchema(GenericClass.class)); + p.run(); + + p = TestPipeline.create(); + PCollection<GenericClass> input = p + .apply(AvroIO.Read + .from(outputFile.getAbsolutePath()) + .withSchema(GenericClass.class)); + + PAssert.that(input).containsInAnyOrder(values); + p.run(); + DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), + new GenericDatumReader()); + assertEquals(dataFileStream.getMetaString("avro.codec"), "deflate"); + } + + @Test + @SuppressWarnings("unchecked") + @Category(NeedsRunner.class) + public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable { + TestPipeline p = TestPipeline.create(); + List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), + new GenericClass(5, "bar")); + File outputFile = tmpFolder.newFile("output.avro"); + + p.apply(Create.of(values)) + .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .withoutSharding() + .withSchema(GenericClass.class) + .withCodec(CodecFactory.nullCodec())); + p.run(); + + p = TestPipeline.create(); + PCollection<GenericClass> input = p + .apply(AvroIO.Read + .from(outputFile.getAbsolutePath()) + .withSchema(GenericClass.class)); + + PAssert.that(input).containsInAnyOrder(values); + p.run(); + DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), + new GenericDatumReader()); + assertEquals(dataFileStream.getMetaString("avro.codec"), "null"); + } + @DefaultCoder(AvroCoder.class) static class GenericClassV2 { int intField; @@ -212,6 +276,45 @@ public class AvroIOTest { p.run(); } + @Test + public void testWriteWithDefaultCodec() throws Exception { + AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + .to("gs://bucket/foo/baz"); + assertEquals(write.getCodec().toString(), CodecFactory.deflateCodec(6).toString()); + } + + @Test + public void testWriteWithCustomCodec() throws Exception { + AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + .to("gs://bucket/foo/baz") + .withCodec(CodecFactory.snappyCodec()); + assertEquals(write.getCodec().toString(), SNAPPY_CODEC); + } + + @Test + @SuppressWarnings("unchecked") + public void testWriteWithSerDeCustomDeflateCodec() throws Exception { + AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + .to("gs://bucket/foo/baz") + .withCodec(CodecFactory.deflateCodec(9)); + + AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write); + + assertEquals(serdeWrite.getCodec().toString(), CodecFactory.deflateCodec(9).toString()); + } + + @Test + @SuppressWarnings("unchecked") + public void testWriteWithSerDeCustomXZCodec() throws Exception { + AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + .to("gs://bucket/foo/baz") + .withCodec(CodecFactory.xzCodec(9)); + + AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write); + + assertEquals(serdeWrite.getCodec().toString(), CodecFactory.xzCodec(9).toString()); + } + @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. private void runTestWrite(String[] expectedElements, int numShards) throws IOException { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); @@ -304,7 +407,8 @@ public class AvroIOTest { .withSuffix("bar") .withSchema(GenericClass.class) .withNumShards(100) - .withoutValidation(); + .withoutValidation() + .withCodec(CodecFactory.snappyCodec()); DisplayData displayData = DisplayData.from(write); @@ -314,6 +418,7 @@ public class AvroIOTest { assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false)); + assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java new file mode 100644 index 0000000..3fe8740 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import static org.apache.avro.file.DataFileConstants.BZIP2_CODEC; +import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; +import static org.apache.avro.file.DataFileConstants.NULL_CODEC; +import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; +import static org.apache.avro.file.DataFileConstants.XZ_CODEC; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.file.CodecFactory; +import org.apache.beam.sdk.util.SerializableUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests of SerializableAvroCodecFactory. + */ +@RunWith(JUnit4.class) +public class SerializableAvroCodecFactoryTest { + private final List<String> avroCodecs = Arrays.asList(NULL_CODEC, + SNAPPY_CODEC, + DEFLATE_CODEC, + XZ_CODEC, + BZIP2_CODEC); + + @Test + public void testDefaultCodecsIn() throws Exception { + for (String codec : avroCodecs) { + SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory( + CodecFactory.fromString(codec)); + + assertEquals(codecFactory.getCodec().toString(), (CodecFactory.fromString(codec).toString())); + } + } + + @Test + public void testDefaultCodecsSerDe() throws Exception { + for (String codec : avroCodecs) { + SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory( + CodecFactory.fromString(codec)); + + SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory); + + assertEquals(serdeC.getCodec().toString(), CodecFactory.fromString(codec).toString()); + } + } + + @Test + public void testDeflateCodecSerDeWithLevels() throws Exception { + for (int i = 0; i < 10; ++i) { + SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory( + CodecFactory.deflateCodec(i)); + + SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory); + + assertEquals(serdeC.getCodec().toString(), CodecFactory.deflateCodec(i).toString()); + } + } + + @Test + public void testXZCodecSerDeWithLevels() throws Exception { + for (int i = 0; i < 10; ++i) { + SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory( + CodecFactory.xzCodec(i)); + + SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory); + + assertEquals(serdeC.getCodec().toString(), CodecFactory.xzCodec(i).toString()); + } + } + + @Test(expected = NullPointerException.class) + public void testNullCodecToString() throws Exception { + // use default CTR (available cause Serializable) + SerializableAvroCodecFactory codec = new SerializableAvroCodecFactory(); + codec.toString(); + } +} +