[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2612 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user qingdao81 commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r83372815 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,126 @@ +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodec() throws Exception { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(Codec.SNAPPY); +} catch (Exception ex) { +// then +fail("unexpected exception"); +} +} + +@Test +public void testSetCodecError() throws Exception { +// given +boolean error = false; +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(null); +} catch (Exception ex) { +error = true; +} + +// then +assertTrue(error); +} + +@Test +public void testSerialization() throws Exception { + +serializeAndDeserialize(null); +for (final Codec codec : Codec.values()) { +serializeAndDeserialize(codec); +} +} + +private void serializeAndDeserialize(final Codec codec) throws IOException, ClassNotFoundException { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); +if (codec != null) { +outputFormat.setCodec(codec); +} +outputFormat.setSchema(User.SCHEMA$); + +final File serialized = File.createTempFile("avro-output-format", ".serialized"); + +// when +try (final ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(serialized))) { +oos.writeObject(outputFormat); +} +try (final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(serialized))) { +// then +assertTrue(ois.readObject() instanceof AvroOutputFormat); --- End diff -- Cool didn't know about this class. Makes life easier. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user qingdao81 commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r83307880 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,126 @@ +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodec() throws Exception { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(Codec.SNAPPY); +} catch (Exception ex) { +// then +fail("unexpected exception"); +} +} + +@Test +public void testSetCodecError() throws Exception { +// given +boolean error = false; +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(null); +} catch (Exception ex) { +error = true; +} + +// then +assertTrue(error); +} + +@Test +public void testSerialization() throws Exception { + +serializeAndDeserialize(null); +for (final Codec codec : Codec.values()) { +serializeAndDeserialize(codec); +} +} + +private void serializeAndDeserialize(final Codec codec) throws IOException, ClassNotFoundException { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); +if (codec != null) { +outputFormat.setCodec(codec); +} +outputFormat.setSchema(User.SCHEMA$); + +final File serialized = File.createTempFile("avro-output-format", ".serialized"); + +// when +try (final ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(serialized))) { +oos.writeObject(outputFormat); +} +try (final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(serialized))) { +// then +assertTrue(ois.readObject() instanceof AvroOutputFormat); --- End diff -- You are right this should be checked but I'm not sure what is the best way to do so ... I could change the visibility for userDefinedSchema and codec to package private or provide a getter for both properties. But I would increase the visibility of params just for test purposes. Some people are ok with this others not ;-) or do you have something else in mind? - I could also write a d read some data after the serialization to verify schema and compression but in my opinion this would be out of the scope of a serialization test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r83290431 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,126 @@ +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodec() throws Exception { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(Codec.SNAPPY); +} catch (Exception ex) { +// then +fail("unexpected exception"); +} +} + +@Test +public void testSetCodecError() throws Exception { +// given +boolean error = false; +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(null); +} catch (Exception ex) { +error = true; +} + +// then +assertTrue(error); +} + +@Test +public void testSerialization() throws Exception { + +serializeAndDeserialize(null); +for (final Codec codec : Codec.values()) { +serializeAndDeserialize(codec); +} +} + +private void serializeAndDeserialize(final Codec codec) throws IOException, ClassNotFoundException { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); +if (codec != null) { +outputFormat.setCodec(codec); +} +outputFormat.setSchema(User.SCHEMA$); + +final File serialized = File.createTempFile("avro-output-format", ".serialized"); + +// when +try (final ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(serialized))) { +oos.writeObject(outputFormat); +} +try (final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(serialized))) { +// then +assertTrue(ois.readObject() instanceof AvroOutputFormat); --- End diff -- Please check that the relevant properties (schema + codec) of the deserialized object are correctly restored. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r83289172 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,126 @@ +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodec() throws Exception { --- End diff -- Please change space indention to tabs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r83288193 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,126 @@ +package org.apache.flink.api.java.io; --- End diff -- Please add the Apache license header to this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r83290286 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,126 @@ +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodec() throws Exception { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(Codec.SNAPPY); +} catch (Exception ex) { +// then +fail("unexpected exception"); +} +} + +@Test +public void testSetCodecError() throws Exception { +// given +boolean error = false; +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); + +// when +try { +outputFormat.setCodec(null); +} catch (Exception ex) { +error = true; +} + +// then +assertTrue(error); +} + +@Test +public void testSerialization() throws Exception { + +serializeAndDeserialize(null); +for (final Codec codec : Codec.values()) { +serializeAndDeserialize(codec); +} +} + +private void serializeAndDeserialize(final Codec codec) throws IOException, ClassNotFoundException { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(User.class); +if (codec != null) { +outputFormat.setCodec(codec); +} +outputFormat.setSchema(User.SCHEMA$); + +final File serialized = File.createTempFile("avro-output-format", ".serialized"); + +// when +try (final ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(serialized))) { --- End diff -- Please use a `ByteArrayOutputStream` instead of a `FileOutputStream`. That way the test is not touching the disk and therefore faster and more reliable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r82594969 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,106 @@ +package org.apache.flink.api.java.io; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.avro.file.CodecFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodecFactory() throws Exception { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(DummyAvroType.class); + +// when +try { +outputFormat.setCodecFactory(CodecFactory.snappyCodec()); +} catch (Exception ex) { +// then +fail("unexpected exception"); +} +} + +@Test +public void testSetCodecFactoryError() throws Exception { +// given +boolean error = false; +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(DummyAvroType.class); + +// when +try { +outputFormat.setCodecFactory(null); +} catch (Exception ex) { +error = true; +} + +// then +assertTrue(error); +} + +@Test +public void testCompression() throws Exception { +// given +final Path outputPath = path("avro-output-file.avro"); +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(outputPath, DummyAvroType.class); +outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + +final Path compressedOutputPath = path("avro-output-file-compressed.avro"); +final AvroOutputFormat compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath, DummyAvroType.class); + compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); +compressedOutputFormat.setCodecFactory(CodecFactory.snappyCodec()); + +// when +output(outputFormat); +output(compressedOutputFormat); + +// then +assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath)); +} + +private long fileSize(Path path) throws IOException { +return Files.size(Paths.get(path.getPath())); +} + +private void output(final AvroOutputFormat outputFormat) throws IOException { +outputFormat.configure(new Configuration()); +outputFormat.open(1,1); +for (int i = 0; i < 100; i++) { +outputFormat.writeRecord(new DummyAvroType(1)); +} +outputFormat.close(); +} + +private Path path(final String virtualPath) throws URISyntaxException { +return new Path(Paths.get(getClass().getResource("/").toURI()).toString() + "/" + virtualPath); --- End diff -- Please use `File.createTempFile()` to create a file in the default temp space. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r82591567 --- Diff: flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java --- @@ -36,6 +38,8 @@ private final Class avroValueType; private transient Schema userDefinedSchema = null; + + private transient CodecFactory codecFactory = null; --- End diff -- On way to do it is to have a map of supported codecs and serialize a byte id. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r82591316 --- Diff: flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java --- @@ -36,6 +38,8 @@ private final Class avroValueType; private transient Schema userDefinedSchema = null; + + private transient CodecFactory codecFactory = null; --- End diff -- Flink uses Java Serialization to ship code from the client to the cluster. `CodecFactory` does not implement `Serializable`. Therefore, the `codecFactory` field must be declared as `transient`. However, the `writeObject` and `readObject` methods must be adapted to manually serialize / deserialize the information. Otherwise, the `CodecFactory` is lost when the object is deserialized at the client. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r82590833 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,106 @@ +package org.apache.flink.api.java.io; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.avro.file.CodecFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { --- End diff -- Please add a test that checks that AvroOutputFormat is correctly serialized and deserialized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2612#discussion_r82590712 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java --- @@ -0,0 +1,106 @@ +package org.apache.flink.api.java.io; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.avro.file.CodecFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + +@Test +public void testSetCodecFactory() throws Exception { +// given +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(DummyAvroType.class); + +// when +try { +outputFormat.setCodecFactory(CodecFactory.snappyCodec()); +} catch (Exception ex) { +// then +fail("unexpected exception"); +} +} + +@Test +public void testSetCodecFactoryError() throws Exception { +// given +boolean error = false; +final AvroOutputFormat outputFormat = new AvroOutputFormat<>(DummyAvroType.class); + +// when +try { +outputFormat.setCodecFactory(null); +} catch (Exception ex) { +error = true; +} + +// then +assertTrue(error); +} + +@Test +public void testCompression() throws Exception { +// given +final Path outputPath = path("avro-output-file.avro"); --- End diff -- Please remove the files after the test was executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2612: FLINK-4771: Compression for AvroOutputFormat
GitHub user qingdao81 opened a pull request: https://github.com/apache/flink/pull/2612 FLINK-4771: Compression for AvroOutputFormat Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/qingdao81/flink feature/FLINK-4771 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2612.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2612 commit 71edd2d022991787a0549fd32e9bf90ca52f9853 Author: larsbachmann Date: 2016-10-08T11:30:14Z FLINK-4771: Compression for AvroOutputFormat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---