Repository: nifi Updated Branches: refs/heads/master 2c907c63a -> 75d0c74d2
NIFI-1840 Added compression type property in Kite processors This closes #409 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75d0c74d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75d0c74d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75d0c74d Branch: refs/heads/master Commit: 75d0c74d273600629d7a6e7027196c39b66513bb Parents: 2c907c6 Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Tue May 3 17:00:49 2016 +0200 Committer: Andre F de Miranda <trix...@users.noreply.github.com> Committed: Thu Oct 6 00:20:52 2016 +1100 ---------------------------------------------------------------------- .../kite/AbstractKiteConvertProcessor.java | 62 ++++++++++++++++++ .../nifi/processors/kite/ConvertAvroSchema.java | 22 +++++-- .../nifi/processors/kite/ConvertCSVToAvro.java | 6 +- .../nifi/processors/kite/ConvertJSONToAvro.java | 12 +++- .../processors/kite/TestCSVToAvroProcessor.java | 31 ++++++++- .../processors/kite/TestConvertAvroSchema.java | 68 ++++++++++++++++++++ .../kite/TestJSONToAvroProcessor.java | 30 +++++++++ 7 files changed, 219 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java new file mode 100644 index 0000000..561bf46 --- /dev/null +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import org.apache.avro.file.CodecFactory; +import org.apache.nifi.components.PropertyDescriptor; + +import com.google.common.annotations.VisibleForTesting; + +abstract class AbstractKiteConvertProcessor extends AbstractKiteProcessor { + + @VisibleForTesting + static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name("kite-compression-type") + .displayName("Compression type") + .description("Compression type to use when writting Avro files. Default is Snappy.") + .allowableValues(CodecType.values()) + .defaultValue(CodecType.SNAPPY.toString()) + .build(); + + public enum CodecType { + BZIP2, + DEFLATE, + NONE, + SNAPPY, + LZO + } + + protected CodecFactory getCodecFactory(String property) { + CodecType type = CodecType.valueOf(property); + switch (type) { + case BZIP2: + return CodecFactory.bzip2Codec(); + case DEFLATE: + return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL); + case NONE: + return CodecFactory.nullCodec(); + case LZO: + return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL); + case SNAPPY: + default: + return CodecFactory.snappyCodec(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java index a8244d2..a3fffc3 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; @@ -70,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong; @DynamicProperty(name = "Field name from input schema", value = "Field name for output schema", description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id") -public class ConvertAvroSchema extends AbstractKiteProcessor { +public class ConvertAvroSchema extends AbstractKiteConvertProcessor { private static final Relationship SUCCESS = new Relationship.Builder() .name("success") @@ -180,7 +179,9 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { .<PropertyDescriptor> builder() .add(INPUT_SCHEMA) .add(OUTPUT_SCHEMA) - .add(LOCALE).build(); + .add(LOCALE) + .add(COMPRESSION_TYPE) + .build(); private static final Set<Relationship> RELATIONSHIPS = ImmutableSet .<Relationship> builder().add(SUCCESS).add(FAILURE).build(); @@ -284,11 +285,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { final DataFileWriter<Record> writer = new DataFileWriter<>( AvroUtil.newDatumWriter(outputSchema, Record.class)); - writer.setCodec(CodecFactory.snappyCodec()); + writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); final DataFileWriter<Record> failureWriter = new DataFileWriter<>( AvroUtil.newDatumWriter(outputSchema, Record.class)); - failureWriter.setCodec(CodecFactory.snappyCodec()); + failureWriter.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); try { final AtomicLong written = new AtomicLong(0L); @@ -376,6 +377,17 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); session.transfer(incomingAvro, FAILURE); + } finally { + try { + writer.close(); + } catch (IOException e) { + getLogger().warn("Unable to close writer ressource", e); + } + try { + failureWriter.close(); + } catch (IOException e) { + getLogger().warn("Unable to close writer ressource", e); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index de4130f..22244ee 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Set; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; import org.apache.commons.lang3.StringEscapeUtils; @@ -63,7 +62,7 @@ import java.util.concurrent.atomic.AtomicLong; @Tags({"kite", "csv", "avro"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts CSV files to Avro according to an Avro Schema") -public class ConvertCSVToAvro extends AbstractKiteProcessor { +public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build(); @@ -164,6 +163,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { .add(ESCAPE) .add(HAS_HEADER) .add(LINES_TO_SKIP) + .add(COMPRESSION_TYPE) .build(); private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder() @@ -221,7 +221,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { } try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) { - writer.setCodec(CodecFactory.snappyCodec()); + writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); try { final AtomicLong written = new AtomicLong(0L); http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 6245362..1127a2d 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Set; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -54,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; @Tags({"kite", "json", "avro"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts JSON files to Avro according to an Avro Schema") -public class ConvertJSONToAvro extends AbstractKiteProcessor { +public class ConvertJSONToAvro extends AbstractKiteConvertProcessor { private static final Relationship SUCCESS = new Relationship.Builder() .name("success") @@ -85,6 +84,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { = ImmutableList.<PropertyDescriptor>builder() .addAll(AbstractKiteProcessor.getProperties()) .add(SCHEMA) + .add(COMPRESSION_TYPE) .build(); private static final Set<Relationship> RELATIONSHIPS @@ -129,7 +129,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { final DataFileWriter<Record> writer = new DataFileWriter<>( AvroUtil.newDatumWriter(schema, Record.class)); - writer.setCodec(CodecFactory.snappyCodec()); + writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); try { final AtomicLong written = new AtomicLong(0L); @@ -200,6 +200,12 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); session.transfer(incomingJSON, FAILURE); + } finally { + try { + writer.close(); + } catch (IOException e) { + getLogger().warn("Unable to close writer ressource", e); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java index 902ec79..9252e81 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -89,7 +90,6 @@ public class TestCSVToAvroProcessor { FAILURE_SUMMARY, incompatible.getAttribute("errors")); } - @Test public void testBasicConversion() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); @@ -119,6 +119,35 @@ public class TestCSVToAvroProcessor { } @Test + public void testBasicConversionWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.DEFLATE.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(CSV_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + String failureContent = new String(runner.getContentAsByteArray(incompatible), + StandardCharsets.UTF_8); + Assert.assertEquals("Should reject an invalid string and double", + CSV_CONTENT, failureContent); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + + @Test public void testAlternateCharset() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java index 2da0513..7a62ac5 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java @@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; import org.apache.commons.lang.LocaleUtils; +import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -125,6 +126,73 @@ public class TestConvertAvroSchema { } @Test + public void testBasicConversionWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, INPUT_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString()); + runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.BZIP2.toString()); + Locale locale = Locale.getDefault(); + runner.setProperty("primaryColor", "color"); + runner.assertValid(); + + NumberFormat format = NumberFormat.getInstance(locale); + + // Two valid rows, and one invalid because "free" is not a double. + Record goodRecord1 = dataBasic("1", "blue", null, null); + Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5)); + Record badRecord = dataBasic("3", "red", "yellow", "free"); + List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2, + badRecord); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 rows", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship( + "failure").get(0); + GenericDatumReader<Record> reader = new GenericDatumReader<Record>( + INPUT_SCHEMA); + DataFileStream<Record> stream = new DataFileStream<Record>( + new ByteArrayInputStream( + runner.getContentAsByteArray(incompatible)), reader); + int count = 0; + for (Record r : stream) { + Assert.assertEquals(badRecord, r); + count++; + } + stream.close(); + Assert.assertEquals(1, count); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + + GenericDatumReader<Record> successReader = new GenericDatumReader<Record>( + OUTPUT_SCHEMA); + DataFileStream<Record> successStream = new DataFileStream<Record>( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + count = 0; + for (Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertBasic(goodRecord1, locale), r); + } else { + Assert.assertEquals(convertBasic(goodRecord2, locale), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + + @Test public void testBasicConversionWithLocales() throws IOException { testBasicConversionWithLocale("en_US"); testBasicConversionWithLocale("fr_FR"); http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java index e0b4a6f..776e2f3 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -84,6 +85,35 @@ public class TestJSONToAvroProcessor { } @Test + public void testBasicConversionWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.NONE.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(JSON_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 3 rows", 3, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + String failureContent = new String(runner.getContentAsByteArray(incompatible), + StandardCharsets.UTF_8); + Assert.assertEquals("Should reject an invalid string and double", + JSON_CONTENT, failureContent); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + + @Test public void testOnlyErrors() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); runner.assertNotValid();