Repository: nifi Updated Branches: refs/heads/master 5f16f48a2 -> b29304df7
NIFI-4857: Support String<->byte[] conversion for records This closes #2570. Signed-off-by: Mark Payne <marka...@hotmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b29304df Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b29304df Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b29304df Branch: refs/heads/master Commit: b29304df79e78c5687b0c9411d5fab6cb93e6541 Parents: 5f16f48 Author: Matthew Burgess <mattyb...@apache.org> Authored: Tue Mar 20 22:02:47 2018 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon Apr 9 14:53:21 2018 -0400 ---------------------------------------------------------------------- .../nifi/record/path/functions/ToBytes.java | 76 +++++++++++++ .../nifi/record/path/functions/ToString.java | 83 ++++++++++++++ .../record/path/paths/RecordPathCompiler.java | 10 ++ .../apache/nifi/record/path/TestRecordPath.java | 67 ++++++++++++ .../nifi/serialization/record/MapRecord.java | 3 +- .../record/util/DataTypeUtils.java | 108 +++++++++++++++++-- .../serialization/record/TestDataTypeUtils.java | 34 +++++- .../src/main/asciidoc/record-path-guide.adoc | 57 ++++++++++ .../java/org/apache/nifi/avro/AvroTypeUtil.java | 47 +++++--- .../org/apache/nifi/avro/TestAvroTypeUtil.java | 27 ++++- 10 files changed, 483 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java new file mode 100644 index 0000000..47275cf --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.nio.charset.Charset; +import java.util.stream.Stream; + +public class ToBytes extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment charsetSegment; + + public ToBytes(final RecordPathSegment recordPath, final RecordPathSegment charsetSegment, final boolean absolute) { + super("toBytes", null, absolute); + this.recordPath = recordPath; + this.charsetSegment = charsetSegment; + } + + @Override + public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + + if (!(fv.getValue() instanceof String)) { + throw new IllegalArgumentException("Argument supplied to toBytes must be a String"); + } + + final Charset charset = getCharset(this.charsetSegment, context); + + final byte[] bytesValue; + Byte[] src = (Byte[]) DataTypeUtils.toArray(fv.getValue(), fv.getField().getFieldName(), RecordFieldType.BYTE.getDataType(), charset); + bytesValue = new byte[src.length]; + for (int i = 0; i < src.length; i++) { + bytesValue[i] = src[i]; + } + + return new StandardFieldValue(bytesValue, fv.getField(), fv.getParent().orElse(null)); + }); + } + + private Charset getCharset(final RecordPathSegment charsetSegment, final RecordPathEvaluationContext context) { + if (charsetSegment == null) { + return null; + } + + final String charsetString = RecordPathUtils.getFirstStringValue(charsetSegment, context); + if (charsetString == null || charsetString.isEmpty()) { + return null; + } + + return DataTypeUtils.getCharset(charsetString); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java new file mode 100644 index 0000000..7204ffc --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.nio.charset.Charset; +import java.util.stream.Stream; + +public class ToString extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment charsetSegment; + + public ToString(final RecordPathSegment recordPath, final RecordPathSegment charsetSegment, final boolean absolute) { + super("toString", null, absolute); + this.recordPath = recordPath; + this.charsetSegment = charsetSegment; + } + + @Override + public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final Charset charset = getCharset(this.charsetSegment, context); + Object value = fv.getValue(); + final String stringValue; + + if (value instanceof Object[]) { + Object[] o = (Object[]) value; + if (o.length > 0) { + + byte[] dest = new byte[o.length]; + for (int i = 0; i < o.length; i++) { + dest[i] = (byte) o[i]; + } + stringValue = new String(dest, charset); + } else { + stringValue = ""; // Empty array = empty string + } + } else if (!(fv.getValue() instanceof byte[])) { + stringValue = fv.getValue().toString(); + } else { + stringValue = DataTypeUtils.toString(fv.getValue(), (String) null, charset); + } + return new StandardFieldValue(stringValue, fv.getField(), fv.getParent().orElse(null)); + }); + } + + private Charset getCharset(final RecordPathSegment charsetSegment, final RecordPathEvaluationContext context) { + if (charsetSegment == null) { + return null; + } + + final String charsetString = RecordPathUtils.getFirstStringValue(charsetSegment, context); + if (charsetString == null || charsetString.isEmpty()) { + return null; + } + + return DataTypeUtils.getCharset(charsetString); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index cfb5c06..9a2821a 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -75,7 +75,9 @@ import org.apache.nifi.record.path.functions.SubstringAfter; import org.apache.nifi.record.path.functions.SubstringAfterLast; import org.apache.nifi.record.path.functions.SubstringBefore; import org.apache.nifi.record.path.functions.SubstringBeforeLast; +import org.apache.nifi.record.path.functions.ToBytes; import org.apache.nifi.record.path.functions.ToDate; +import org.apache.nifi.record.path.functions.ToString; public class RecordPathCompiler { @@ -250,6 +252,14 @@ public class RecordPathCompiler { final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); return new ToDate(args[0], args[1], absolute); } + case "toString": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new ToString(args[0], args[1], absolute); + } + case "toBytes": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new ToBytes(args[0], args[1], absolute); + } case "format": { final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); return new Format(args[0], args[1], absolute); http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 67e390a..dbf5fba 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.text.DateFormat; import java.text.ParseException; @@ -1210,6 +1212,71 @@ public class TestRecordPath { assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue()); } + @Test + public void testToString() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("bytes", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", 48); + values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16)); + final Record record = new MapRecord(schema, values); + + assertEquals("Hello World!", RecordPath.compile("toString(/bytes, \"UTF-16\")").evaluate(record).getSelectedFields().findFirst().get().getValue()); + } + + @Test(expected = IllegalCharsetNameException.class) + public void testToStringBadCharset() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("bytes", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", 48); + values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16)); + final Record record = new MapRecord(schema, values); + + RecordPath.compile("toString(/bytes, \"NOT A REAL CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue(); + } + + @Test + public void testToBytes() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("s", RecordFieldType.STRING.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", 48); + values.put("s", "Hello World!"); + final Record record = new MapRecord(schema, values); + + assertArrayEquals("Hello World!".getBytes(StandardCharsets.UTF_16LE), + (byte[]) RecordPath.compile("toBytes(/s, \"UTF-16LE\")").evaluate(record).getSelectedFields().findFirst().get().getValue()); + } + + @Test(expected = IllegalCharsetNameException.class) + public void testToBytesBadCharset() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("s", RecordFieldType.STRING.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", 48); + values.put("s", "Hello World!"); + final Record record = new MapRecord(schema, values); + + RecordPath.compile("toBytes(/s, \"NOT A REAL CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue(); + } + private List<RecordField> getDefaultFields() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index 0335bd2..57b7ac3 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -17,6 +17,7 @@ package org.apache.nifi.serialization.record; +import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.util.Date; import java.util.HashMap; @@ -268,7 +269,7 @@ public class MapRecord implements Record { @Override public Object[] getAsArray(final String fieldName) { - return DataTypeUtils.toArray(getValue(fieldName), fieldName); + return DataTypeUtils.toArray(getValue(fieldName), fieldName, null, StandardCharsets.UTF_8); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 145c17c..477b02a 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -18,6 +18,8 @@ package org.apache.nifi.serialization.record.util; import java.math.BigInteger; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -84,7 +86,11 @@ public class DataTypeUtils { private static final Supplier<DateFormat> DEFAULT_TIMESTAMP_FORMAT = () -> getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()); public static Object convertType(final Object value, final DataType dataType, final String fieldName) { - return convertType(value, dataType, DEFAULT_DATE_FORMAT, DEFAULT_TIME_FORMAT, DEFAULT_TIMESTAMP_FORMAT, fieldName); + return convertType(value, dataType, fieldName, StandardCharsets.UTF_8); + } + + public static Object convertType(final Object value, final DataType dataType, final String fieldName, final Charset charset) { + return convertType(value, dataType, DEFAULT_DATE_FORMAT, DEFAULT_TIME_FORMAT, DEFAULT_TIMESTAMP_FORMAT, fieldName, charset); } public static DateFormat getDateFormat(final RecordFieldType fieldType, final Supplier<DateFormat> dateFormat, @@ -102,7 +108,12 @@ public class DataTypeUtils { } public static Object convertType(final Object value, final DataType dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> timeFormat, - final Supplier<DateFormat> timestampFormat, final String fieldName) { + final Supplier<DateFormat> timestampFormat, final String fieldName) { + return convertType(value, dataType, dateFormat, timeFormat, timestampFormat, fieldName, StandardCharsets.UTF_8); + } + + public static Object convertType(final Object value, final DataType dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> timeFormat, + final Supplier<DateFormat> timestampFormat, final String fieldName, final Charset charset) { if (value == null) { return null; @@ -130,19 +141,19 @@ public class DataTypeUtils { case SHORT: return toShort(value, fieldName); case STRING: - return toString(value, () -> getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, timestampFormat)); + return toString(value, () -> getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, timestampFormat), charset); case TIME: return toTime(value, timeFormat, fieldName); case TIMESTAMP: return toTimestamp(value, timestampFormat, fieldName); case ARRAY: - return toArray(value, fieldName); + return toArray(value, fieldName, ((ArrayDataType)dataType).getElementType(), charset); case MAP: return toMap(value, fieldName); case RECORD: final RecordDataType recordType = (RecordDataType) dataType; final RecordSchema childSchema = recordType.getChildSchema(); - return toRecord(value, childSchema, fieldName); + return toRecord(value, childSchema, fieldName, charset); case CHOICE: { final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; final DataType chosenDataType = chooseDataType(value, choiceDataType); @@ -151,7 +162,7 @@ public class DataTypeUtils { + " for field " + fieldName + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes()); } - return convertType(value, chosenDataType, fieldName); + return convertType(value, chosenDataType, fieldName, charset); } } @@ -162,7 +173,7 @@ public class DataTypeUtils { public static boolean isCompatibleDataType(final Object value, final DataType dataType) { switch (dataType.getFieldType()) { case ARRAY: - return isArrayTypeCompatible(value); + return isArrayTypeCompatible(value, ((ArrayDataType) dataType).getElementType()); case BIGINT: return isBigIntTypeCompatible(value); case BOOLEAN: @@ -217,6 +228,10 @@ public class DataTypeUtils { } public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) { + return toRecord(value, recordSchema, fieldName, StandardCharsets.UTF_8); + } + + public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName, final Charset charset) { if (value == null) { return null; } @@ -247,7 +262,7 @@ public class DataTypeUtils { } final Object rawValue = entry.getValue(); - final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName); + final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName, charset); coercedValues.put(key, coercedValue); } @@ -261,7 +276,11 @@ public class DataTypeUtils { return value != null && value instanceof Record; } - public static Object[] toArray(final Object value, final String fieldName) { + public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) { + return toArray(value, fieldName, elementDataType, StandardCharsets.UTF_8); + } + + public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType, final Charset charset) { if (value == null) { return null; } @@ -270,11 +289,32 @@ public class DataTypeUtils { return (Object[]) value; } + if (value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType)) { + byte[] src = ((String) value).getBytes(charset); + Byte[] dest = new Byte[src.length]; + for (int i = 0; i < src.length; i++) { + dest[i] = src[i]; + } + return dest; + } + + if (value instanceof byte[]) { + byte[] src = (byte[]) value; + Byte[] dest = new Byte[src.length]; + for (int i = 0; i < src.length; i++) { + dest[i] = src[i]; + } + return dest; + } + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName); } - public static boolean isArrayTypeCompatible(final Object value) { - return value != null && value instanceof Object[]; + public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) { + return value != null + // Either an object array or a String to be converted to byte[] + && (value instanceof Object[] + || (value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType))); } @SuppressWarnings("unchecked") @@ -416,6 +456,10 @@ public class DataTypeUtils { public static String toString(final Object value, final Supplier<DateFormat> format) { + return toString(value, format, StandardCharsets.UTF_8); + } + + public static String toString(final Object value, final Supplier<DateFormat> format, final Charset charset) { if (value == null) { return null; } @@ -432,6 +476,32 @@ public class DataTypeUtils { return formatDate((java.util.Date) value, format); } + if (value instanceof byte[]) { + return new String((byte[])value, charset); + } + + if (value instanceof Byte[]) { + Byte[] src = (Byte[]) value; + byte[] dest = new byte[src.length]; + for(int i=0;i<src.length;i++) { + dest[i] = src[i]; + } + return new String(dest, charset); + } + if (value instanceof Object[]) { + Object[] o = (Object[]) value; + if (o.length > 0) { + + byte[] dest = new byte[o.length]; + for (int i = 0; i < o.length; i++) { + dest[i] = (byte) o[i]; + } + return new String(dest, charset); + } else { + return ""; // Empty array = empty string + } + } + return value.toString(); } @@ -445,6 +515,10 @@ public class DataTypeUtils { } public static String toString(final Object value, final String format) { + return toString(value, format, StandardCharsets.UTF_8); + } + + public static String toString(final Object value, final String format, final Charset charset) { if (value == null) { return null; } @@ -474,6 +548,10 @@ public class DataTypeUtils { return Arrays.toString((Object[]) value); } + if (value instanceof byte[]) { + return new String((byte[]) value, charset); + } + return value.toString(); } @@ -1100,4 +1178,12 @@ public class DataTypeUtils { return true; } + + public static Charset getCharset(String charsetName) { + if(charsetName == null) { + return StandardCharsets.UTF_8; + } else { + return Charset.forName(charsetName); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index a239ea7..2c068c2 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -21,6 +21,7 @@ import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.junit.Test; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; @@ -124,8 +125,8 @@ public class TestDataTypeUtils { complexValueRecord2.put("a",new String[] {"hello","world!"}); complexValueRecord2.put("b",new String[] {"5","4","3"}); - complexValues.put("complex1", DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1")); - complexValues.put("complex2", DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2")); + complexValues.put("complex1", DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1", StandardCharsets.UTF_8)); + complexValues.put("complex2", DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2", StandardCharsets.UTF_8)); values.put("complex", complexValues); final Record inputRecord = new MapRecord(schema, values); @@ -165,4 +166,33 @@ public class TestDataTypeUtils { assertEquals("4", ((String[])o)[1]); } + + @Test + public void testStringToBytes() { + Object bytes = DataTypeUtils.convertType("Hello", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()),null, StandardCharsets.UTF_8); + assertTrue(bytes instanceof Byte[]); + assertNotNull(bytes); + Byte[] b = (Byte[]) bytes; + assertEquals("Conversion from String to byte[] failed", (long) 72, (long) b[0] ); // H + assertEquals("Conversion from String to byte[] failed", (long) 101, (long) b[1] ); // e + assertEquals("Conversion from String to byte[] failed", (long) 108, (long) b[2] ); // l + assertEquals("Conversion from String to byte[] failed", (long) 108, (long) b[3] ); // l + assertEquals("Conversion from String to byte[] failed", (long) 111, (long) b[4] ); // o + } + + @Test + public void testBytesToString() { + Object s = DataTypeUtils.convertType("Hello".getBytes(StandardCharsets.UTF_16), RecordFieldType.STRING.getDataType(),null, StandardCharsets.UTF_16); + assertNotNull(s); + assertTrue(s instanceof String); + assertEquals("Conversion from byte[] to String failed", "Hello", s); + } + + @Test + public void testBytesToBytes() { + Object b = DataTypeUtils.convertType("Hello".getBytes(StandardCharsets.UTF_16), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()),null, StandardCharsets.UTF_16); + assertNotNull(b); + assertTrue(b instanceof Byte[]); + assertEquals("Conversion from byte[] to String failed at char 0", (Object) "Hello".getBytes(StandardCharsets.UTF_16)[0], ((Byte[]) b)[0]); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-docs/src/main/asciidoc/record-path-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index f011c52..8de98ee 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -502,6 +502,63 @@ The following record path would parse the eventDate field into a Date: `toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")` +=== toString + +Converts a value to a String, using the given character set if the input type is "bytes". For example, +given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "bytes", "type" : "bytes"} + ] +} +---- + +and a record such as: + +---- +{ + "name" : "My Event", + "bytes" : "Hello World!" +} +---- + +The following record path would parse the bytes field into a String: + +`toString( /bytes, "UTF-8")` + +=== toBytes + +Converts a String to byte[], using the given character set. For example, given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "s", "type" : "string"} + ] +} +---- + +and a record such as: + +---- +{ + "name" : "My Event", + "s" : "Hello World!" +} +---- + +The following record path would convert the String field into a byte array using UTF-16 encoding: + +`toBytes( /s, "UTF-16")` + === format Converts a Date to a String in the given format. http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index a01e03d..411ca68 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -20,6 +20,8 @@ package org.apache.nifi.avro; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.sql.Time; import java.sql.Timestamp; import java.time.Duration; @@ -460,6 +462,10 @@ public class AvroTypeUtil { } public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException { + return createAvroRecord(record, avroSchema, StandardCharsets.UTF_8); + } + + public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema, final Charset charset) throws IOException { final GenericRecord rec = new GenericData.Record(avroSchema); final RecordSchema recordSchema = record.getSchema(); @@ -473,7 +479,7 @@ public class AvroTypeUtil { continue; } - final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName); + final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset); rec.put(fieldName, converted); } @@ -490,11 +496,19 @@ public class AvroTypeUtil { } /** - * Convert a raw value to an Avro object to serialize in Avro type system. + * Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary. * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}. */ public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) { - return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName()); + return convertToAvroObject(rawValue, fieldSchema, StandardCharsets.UTF_8); + } + + /** + * Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary. + * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}. + */ + public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final Charset charset) { + return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName(), charset); } /** @@ -512,7 +526,7 @@ public class AvroTypeUtil { recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable)); } else { Object defaultValue = field.defaultVal(); - if (fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue)) { + if (fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) dataType).getElementType())) { defaultValue = defaultValue instanceof List ? ((List<?>) defaultValue).toArray() : new Object[0]; } recordFields.add(new RecordField(fieldName, dataType, defaultValue, field.aliases(), nullable)); @@ -526,7 +540,7 @@ public class AvroTypeUtil { } @SuppressWarnings("unchecked") - private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) { + private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName, final Charset charset) { if (rawValue == null) { return null; } @@ -609,6 +623,9 @@ public class AvroTypeUtil { if (rawValue instanceof byte[]) { return ByteBuffer.wrap((byte[]) rawValue); } + if (rawValue instanceof String) { + return ByteBuffer.wrap(((String) rawValue).getBytes(charset)); + } if (rawValue instanceof Object[]) { return AvroTypeUtil.convertByteArray((Object[]) rawValue); } else { @@ -630,7 +647,7 @@ public class AvroTypeUtil { final Map<String, Object> objectMap = (Map<String, Object>) rawValue; final Map<String, Object> map = new HashMap<>(objectMap.size()); for (final String s : objectMap.keySet()) { - final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]"); + final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]", charset); map.put(s, converted); } return map; @@ -650,18 +667,18 @@ public class AvroTypeUtil { continue; } - final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName); + final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName, charset); avroRecord.put(recordFieldName, converted); } return avroRecord; case UNION: - return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName), fieldName); + return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName); case ARRAY: final Object[] objectArray = (Object[]) rawValue; final List<Object> list = new ArrayList<>(objectArray.length); int i = 0; for (final Object o : objectArray) { - final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]"); + final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]", charset); list.add(converted); i++; } @@ -677,13 +694,17 @@ public class AvroTypeUtil { case ENUM: return new GenericData.EnumSymbol(fieldSchema, rawValue); case STRING: - return DataTypeUtils.toString(rawValue, (String) null); + return DataTypeUtils.toString(rawValue, (String) null, charset); } return rawValue; } public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) { + return convertAvroRecordToMap(avroRecord, recordSchema, StandardCharsets.UTF_8); + } + + public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) { final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount()); for (final RecordField recordField : recordSchema.getFields()) { @@ -710,7 +731,7 @@ public class AvroTypeUtil { final Object rawValue = normalizeValue(value, fieldSchema, fieldName); final DataType desiredType = recordField.getDataType(); - final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName); + final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName, charset); values.put(fieldName, coercedValue); } catch (Exception ex) { @@ -779,7 +800,7 @@ public class AvroTypeUtil { } break; case ARRAY: - if (value instanceof Array || value instanceof List) { + if (value instanceof Array || value instanceof List || value instanceof ByteBuffer) { return true; } break; @@ -795,7 +816,7 @@ public class AvroTypeUtil { /** * Convert an Avro object to a normal Java objects for further processing. - * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)} + * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String, Charset)} */ private static Object normalizeValue(final Object value, final Schema avroSchema, final String fieldName) { if (value == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java index bf7d255..e4e515b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java @@ -25,6 +25,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -325,7 +327,7 @@ public class TestAvroTypeUtil { try (DataFileStream<GenericRecord> r = new DataFileStream<>(getClass().getResourceAsStream("data.avro"), new GenericDatumReader<>())) { GenericRecord n = r.next(); - AvroTypeUtil.convertAvroRecordToMap(n, recordASchema); + AvroTypeUtil.convertAvroRecordToMap(n, recordASchema, StandardCharsets.UTF_8); } } @@ -372,7 +374,7 @@ public class TestAvroTypeUtil { expects.forEach((rawValue, expect) -> { final Object convertedValue; try { - convertedValue = AvroTypeUtil.convertToAvroObject(rawValue, fieldSchema); + convertedValue = AvroTypeUtil.convertToAvroObject(rawValue, fieldSchema, StandardCharsets.UTF_8); } catch (Exception e) { if (expect.equals(e.getClass().getCanonicalName())) { // Expected behavior. @@ -394,4 +396,25 @@ public class TestAvroTypeUtil { } + @Test + public void testStringToBytesConversion() { + Object o = AvroTypeUtil.convertToAvroObject("Hello", Schema.create(Type.BYTES), StandardCharsets.UTF_16); + assertTrue(o instanceof ByteBuffer); + assertEquals("Hello", new String(((ByteBuffer) o).array(), StandardCharsets.UTF_16)); + } + + @Test + public void testStringToNullableBytesConversion() { + Object o = AvroTypeUtil.convertToAvroObject("Hello", Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)), StandardCharsets.UTF_16); + assertTrue(o instanceof ByteBuffer); + assertEquals("Hello", new String(((ByteBuffer) o).array(), StandardCharsets.UTF_16)); + } + + @Test + public void testBytesToStringConversion() { + final Charset charset = Charset.forName("UTF_32LE"); + Object o = AvroTypeUtil.convertToAvroObject("Hello".getBytes(charset), Schema.create(Type.STRING), charset); + assertTrue(o instanceof String); + assertEquals("Hello", o); + } }