Repository: nifi
Updated Branches:
  refs/heads/master 5f16f48a2 -> b29304df7


NIFI-4857: Support String<->byte[] conversion for records
This closes #2570.

Signed-off-by: Mark Payne <marka...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b29304df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b29304df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b29304df

Branch: refs/heads/master
Commit: b29304df79e78c5687b0c9411d5fab6cb93e6541
Parents: 5f16f48
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Tue Mar 20 22:02:47 2018 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Apr 9 14:53:21 2018 -0400

----------------------------------------------------------------------
 .../nifi/record/path/functions/ToBytes.java     |  76 +++++++++++++
 .../nifi/record/path/functions/ToString.java    |  83 ++++++++++++++
 .../record/path/paths/RecordPathCompiler.java   |  10 ++
 .../apache/nifi/record/path/TestRecordPath.java |  67 ++++++++++++
 .../nifi/serialization/record/MapRecord.java    |   3 +-
 .../record/util/DataTypeUtils.java              | 108 +++++++++++++++++--
 .../serialization/record/TestDataTypeUtils.java |  34 +++++-
 .../src/main/asciidoc/record-path-guide.adoc    |  57 ++++++++++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  47 +++++---
 .../org/apache/nifi/avro/TestAvroTypeUtil.java  |  27 ++++-
 10 files changed, 483 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
new file mode 100644
index 0000000..47275cf
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.nio.charset.Charset;
+import java.util.stream.Stream;
+
+public class ToBytes extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment charsetSegment;
+
+    public ToBytes(final RecordPathSegment recordPath, final RecordPathSegment 
charsetSegment, final boolean absolute) {
+        super("toBytes", null, absolute);
+        this.recordPath = recordPath;
+        this.charsetSegment = charsetSegment;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+                .map(fv -> {
+
+                    if (!(fv.getValue() instanceof String)) {
+                        throw new IllegalArgumentException("Argument supplied 
to toBytes must be a String");
+                    }
+
+                    final Charset charset = getCharset(this.charsetSegment, 
context);
+
+                    final byte[] bytesValue;
+                    Byte[] src = (Byte[]) DataTypeUtils.toArray(fv.getValue(), 
fv.getField().getFieldName(), RecordFieldType.BYTE.getDataType(), charset);
+                    bytesValue = new byte[src.length];
+                    for (int i = 0; i < src.length; i++) {
+                        bytesValue[i] = src[i];
+                    }
+
+                    return new StandardFieldValue(bytesValue, fv.getField(), 
fv.getParent().orElse(null));
+                });
+    }
+
+    private Charset getCharset(final RecordPathSegment charsetSegment, final 
RecordPathEvaluationContext context) {
+        if (charsetSegment == null) {
+            return null;
+        }
+
+        final String charsetString = 
RecordPathUtils.getFirstStringValue(charsetSegment, context);
+        if (charsetString == null || charsetString.isEmpty()) {
+            return null;
+        }
+
+        return DataTypeUtils.getCharset(charsetString);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
new file mode 100644
index 0000000..7204ffc
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.nio.charset.Charset;
+import java.util.stream.Stream;
+
+public class ToString extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment charsetSegment;
+
+    public ToString(final RecordPathSegment recordPath, final 
RecordPathSegment charsetSegment, final boolean absolute) {
+        super("toString", null, absolute);
+        this.recordPath = recordPath;
+        this.charsetSegment = charsetSegment;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+                .map(fv -> {
+                    final Charset charset = getCharset(this.charsetSegment, 
context);
+                    Object value = fv.getValue();
+                    final String stringValue;
+
+                    if (value instanceof Object[]) {
+                        Object[] o = (Object[]) value;
+                        if (o.length > 0) {
+
+                            byte[] dest = new byte[o.length];
+                            for (int i = 0; i < o.length; i++) {
+                                dest[i] = (byte) o[i];
+                            }
+                            stringValue = new String(dest, charset);
+                        } else {
+                            stringValue = ""; // Empty array = empty string
+                        }
+                    } else if (!(fv.getValue() instanceof byte[])) {
+                        stringValue = fv.getValue().toString();
+                    } else {
+                        stringValue = DataTypeUtils.toString(fv.getValue(), 
(String) null, charset);
+                    }
+                    return new StandardFieldValue(stringValue, fv.getField(), 
fv.getParent().orElse(null));
+                });
+    }
+
+    private Charset getCharset(final RecordPathSegment charsetSegment, final 
RecordPathEvaluationContext context) {
+        if (charsetSegment == null) {
+            return null;
+        }
+
+        final String charsetString = 
RecordPathUtils.getFirstStringValue(charsetSegment, context);
+        if (charsetString == null || charsetString.isEmpty()) {
+            return null;
+        }
+
+        return DataTypeUtils.getCharset(charsetString);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index cfb5c06..9a2821a 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -75,7 +75,9 @@ import org.apache.nifi.record.path.functions.SubstringAfter;
 import org.apache.nifi.record.path.functions.SubstringAfterLast;
 import org.apache.nifi.record.path.functions.SubstringBefore;
 import org.apache.nifi.record.path.functions.SubstringBeforeLast;
+import org.apache.nifi.record.path.functions.ToBytes;
 import org.apache.nifi.record.path.functions.ToDate;
+import org.apache.nifi.record.path.functions.ToString;
 
 public class RecordPathCompiler {
 
@@ -250,6 +252,14 @@ public class RecordPathCompiler {
                         final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 2, functionName, absolute);
                         return new ToDate(args[0], args[1], absolute);
                     }
+                    case "toString": {
+                        final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new ToString(args[0], args[1], absolute);
+                    }
+                    case "toBytes": {
+                        final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new ToBytes(args[0], args[1], absolute);
+                    }
                     case "format": {
                         final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 2, functionName, absolute);
                         return new Format(args[0], args[1], absolute);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 67e390a..dbf5fba 100644
--- 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -1210,6 +1212,71 @@ public class TestRecordPath {
         assertEquals("John Doe", RecordPath.compile("format(/name, 
'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue());
     }
 
+    @Test
+    public void testToString() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("bytes", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16));
+        final Record record = new MapRecord(schema, values);
+
+        assertEquals("Hello World!", RecordPath.compile("toString(/bytes, 
\"UTF-16\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
+    @Test(expected = IllegalCharsetNameException.class)
+    public void testToStringBadCharset() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("bytes", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16));
+        final Record record = new MapRecord(schema, values);
+
+        RecordPath.compile("toString(/bytes, \"NOT A REAL 
CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue();
+    }
+
+    @Test
+    public void testToBytes() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("s", RecordFieldType.STRING.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("s", "Hello World!");
+        final Record record = new MapRecord(schema, values);
+
+        assertArrayEquals("Hello World!".getBytes(StandardCharsets.UTF_16LE),
+                (byte[]) RecordPath.compile("toBytes(/s, 
\"UTF-16LE\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
+    @Test(expected = IllegalCharsetNameException.class)
+    public void testToBytesBadCharset() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("s", RecordFieldType.STRING.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("s", "Hello World!");
+        final Record record = new MapRecord(schema, values);
+
+        RecordPath.compile("toBytes(/s, \"NOT A REAL 
CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue();
+    }
+
     private List<RecordField> getDefaultFields() {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 0335bd2..57b7ac3 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.serialization.record;
 
+import java.nio.charset.StandardCharsets;
 import java.text.DateFormat;
 import java.util.Date;
 import java.util.HashMap;
@@ -268,7 +269,7 @@ public class MapRecord implements Record {
 
     @Override
     public Object[] getAsArray(final String fieldName) {
-        return DataTypeUtils.toArray(getValue(fieldName), fieldName);
+        return DataTypeUtils.toArray(getValue(fieldName), fieldName, null, 
StandardCharsets.UTF_8);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 145c17c..477b02a 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.serialization.record.util;
 
 import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -84,7 +86,11 @@ public class DataTypeUtils {
     private static final Supplier<DateFormat> DEFAULT_TIMESTAMP_FORMAT = () -> 
getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
 
     public static Object convertType(final Object value, final DataType 
dataType, final String fieldName) {
-        return convertType(value, dataType, DEFAULT_DATE_FORMAT, 
DEFAULT_TIME_FORMAT, DEFAULT_TIMESTAMP_FORMAT, fieldName);
+        return convertType(value, dataType, fieldName, StandardCharsets.UTF_8);
+    }
+
+    public static Object convertType(final Object value, final DataType 
dataType, final String fieldName, final Charset charset) {
+        return convertType(value, dataType, DEFAULT_DATE_FORMAT, 
DEFAULT_TIME_FORMAT, DEFAULT_TIMESTAMP_FORMAT, fieldName, charset);
     }
 
     public static DateFormat getDateFormat(final RecordFieldType fieldType, 
final Supplier<DateFormat> dateFormat,
@@ -102,7 +108,12 @@ public class DataTypeUtils {
     }
 
     public static Object convertType(final Object value, final DataType 
dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> 
timeFormat,
-        final Supplier<DateFormat> timestampFormat, final String fieldName) {
+                                     final Supplier<DateFormat> 
timestampFormat, final String fieldName) {
+        return convertType(value, dataType, dateFormat, timeFormat, 
timestampFormat, fieldName, StandardCharsets.UTF_8);
+    }
+
+    public static Object convertType(final Object value, final DataType 
dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> 
timeFormat,
+        final Supplier<DateFormat> timestampFormat, final String fieldName, 
final Charset charset) {
 
         if (value == null) {
             return null;
@@ -130,19 +141,19 @@ public class DataTypeUtils {
             case SHORT:
                 return toShort(value, fieldName);
             case STRING:
-                return toString(value, () -> 
getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, 
timestampFormat));
+                return toString(value, () -> 
getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, 
timestampFormat), charset);
             case TIME:
                 return toTime(value, timeFormat, fieldName);
             case TIMESTAMP:
                 return toTimestamp(value, timestampFormat, fieldName);
             case ARRAY:
-                return toArray(value, fieldName);
+                return toArray(value, fieldName, 
((ArrayDataType)dataType).getElementType(), charset);
             case MAP:
                 return toMap(value, fieldName);
             case RECORD:
                 final RecordDataType recordType = (RecordDataType) dataType;
                 final RecordSchema childSchema = recordType.getChildSchema();
-                return toRecord(value, childSchema, fieldName);
+                return toRecord(value, childSchema, fieldName, charset);
             case CHOICE: {
                 final ChoiceDataType choiceDataType = (ChoiceDataType) 
dataType;
                 final DataType chosenDataType = chooseDataType(value, 
choiceDataType);
@@ -151,7 +162,7 @@ public class DataTypeUtils {
                         + " for field " + fieldName + " to any of the 
following available Sub-Types for a Choice: " + 
choiceDataType.getPossibleSubTypes());
                 }
 
-                return convertType(value, chosenDataType, fieldName);
+                return convertType(value, chosenDataType, fieldName, charset);
             }
         }
 
@@ -162,7 +173,7 @@ public class DataTypeUtils {
     public static boolean isCompatibleDataType(final Object value, final 
DataType dataType) {
         switch (dataType.getFieldType()) {
             case ARRAY:
-                return isArrayTypeCompatible(value);
+                return isArrayTypeCompatible(value, ((ArrayDataType) 
dataType).getElementType());
             case BIGINT:
                 return isBigIntTypeCompatible(value);
             case BOOLEAN:
@@ -217,6 +228,10 @@ public class DataTypeUtils {
     }
 
     public static Record toRecord(final Object value, final RecordSchema 
recordSchema, final String fieldName) {
+        return toRecord(value, recordSchema, fieldName, 
StandardCharsets.UTF_8);
+    }
+
+    public static Record toRecord(final Object value, final RecordSchema 
recordSchema, final String fieldName, final Charset charset) {
         if (value == null) {
             return null;
         }
@@ -247,7 +262,7 @@ public class DataTypeUtils {
                 }
 
                 final Object rawValue = entry.getValue();
-                final Object coercedValue = convertType(rawValue, 
desiredTypeOption.get(), fieldName);
+                final Object coercedValue = convertType(rawValue, 
desiredTypeOption.get(), fieldName, charset);
                 coercedValues.put(key, coercedValue);
             }
 
@@ -261,7 +276,11 @@ public class DataTypeUtils {
         return value != null && value instanceof Record;
     }
 
-    public static Object[] toArray(final Object value, final String fieldName) 
{
+    public static Object[] toArray(final Object value, final String fieldName, 
final DataType elementDataType) {
+        return toArray(value, fieldName, elementDataType, 
StandardCharsets.UTF_8);
+    }
+
+    public static Object[] toArray(final Object value, final String fieldName, 
final DataType elementDataType, final Charset charset) {
         if (value == null) {
             return null;
         }
@@ -270,11 +289,32 @@ public class DataTypeUtils {
             return (Object[]) value;
         }
 
+        if (value instanceof String && 
RecordFieldType.BYTE.getDataType().equals(elementDataType)) {
+            byte[] src = ((String) value).getBytes(charset);
+            Byte[] dest = new Byte[src.length];
+            for (int i = 0; i < src.length; i++) {
+                dest[i] = src[i];
+            }
+            return dest;
+        }
+
+        if (value instanceof byte[]) {
+            byte[] src = (byte[]) value;
+            Byte[] dest = new Byte[src.length];
+            for (int i = 0; i < src.length; i++) {
+                dest[i] = src[i];
+            }
+            return dest;
+        }
+
         throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Object Array for field " + 
fieldName);
     }
 
-    public static boolean isArrayTypeCompatible(final Object value) {
-        return value != null && value instanceof Object[];
+    public static boolean isArrayTypeCompatible(final Object value, final 
DataType elementDataType) {
+        return value != null
+                // Either an object array or a String to be converted to byte[]
+                && (value instanceof Object[]
+                || (value instanceof String && 
RecordFieldType.BYTE.getDataType().equals(elementDataType)));
     }
 
     @SuppressWarnings("unchecked")
@@ -416,6 +456,10 @@ public class DataTypeUtils {
 
 
     public static String toString(final Object value, final 
Supplier<DateFormat> format) {
+        return toString(value, format, StandardCharsets.UTF_8);
+    }
+
+    public static String toString(final Object value, final 
Supplier<DateFormat> format, final Charset charset) {
         if (value == null) {
             return null;
         }
@@ -432,6 +476,32 @@ public class DataTypeUtils {
             return formatDate((java.util.Date) value, format);
         }
 
+        if (value instanceof byte[]) {
+            return new String((byte[])value, charset);
+        }
+
+        if (value instanceof Byte[]) {
+            Byte[] src = (Byte[]) value;
+            byte[] dest = new byte[src.length];
+            for(int i=0;i<src.length;i++) {
+                dest[i] = src[i];
+            }
+            return new String(dest, charset);
+        }
+        if (value instanceof Object[]) {
+            Object[] o = (Object[]) value;
+            if (o.length > 0) {
+
+                byte[] dest = new byte[o.length];
+                for (int i = 0; i < o.length; i++) {
+                    dest[i] = (byte) o[i];
+                }
+                return new String(dest, charset);
+            } else {
+                return ""; // Empty array = empty string
+            }
+        }
+
         return value.toString();
     }
 
@@ -445,6 +515,10 @@ public class DataTypeUtils {
     }
 
     public static String toString(final Object value, final String format) {
+        return toString(value, format, StandardCharsets.UTF_8);
+    }
+
+    public static String toString(final Object value, final String format, 
final Charset charset) {
         if (value == null) {
             return null;
         }
@@ -474,6 +548,10 @@ public class DataTypeUtils {
             return Arrays.toString((Object[]) value);
         }
 
+        if (value instanceof byte[]) {
+            return new String((byte[]) value, charset);
+        }
+
         return value.toString();
     }
 
@@ -1100,4 +1178,12 @@ public class DataTypeUtils {
 
         return true;
     }
+
+    public static Charset getCharset(String charsetName) {
+        if(charsetName == null) {
+            return StandardCharsets.UTF_8;
+        } else {
+            return Charset.forName(charsetName);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index a239ea7..2c068c2 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -21,6 +21,7 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -124,8 +125,8 @@ public class TestDataTypeUtils {
         complexValueRecord2.put("a",new String[] {"hello","world!"});
         complexValueRecord2.put("b",new String[] {"5","4","3"});
 
-        complexValues.put("complex1", 
DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1"));
-        complexValues.put("complex2", 
DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2"));
+        complexValues.put("complex1", 
DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1", 
StandardCharsets.UTF_8));
+        complexValues.put("complex2", 
DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2", 
StandardCharsets.UTF_8));
 
         values.put("complex", complexValues);
         final Record inputRecord = new MapRecord(schema, values);
@@ -165,4 +166,33 @@ public class TestDataTypeUtils {
         assertEquals("4", ((String[])o)[1]);
 
     }
+
+    @Test
+    public void testStringToBytes() {
+        Object bytes = DataTypeUtils.convertType("Hello", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()),null,
 StandardCharsets.UTF_8);
+        assertTrue(bytes instanceof Byte[]);
+        assertNotNull(bytes);
+        Byte[] b = (Byte[]) bytes;
+        assertEquals("Conversion from String to byte[] failed", (long) 72, 
(long) b[0] );  // H
+        assertEquals("Conversion from String to byte[] failed", (long) 101, 
(long) b[1] ); // e
+        assertEquals("Conversion from String to byte[] failed", (long) 108, 
(long) b[2] ); // l
+        assertEquals("Conversion from String to byte[] failed", (long) 108, 
(long) b[3] ); // l
+        assertEquals("Conversion from String to byte[] failed", (long) 111, 
(long) b[4] ); // o
+    }
+
+    @Test
+    public void testBytesToString() {
+        Object s = 
DataTypeUtils.convertType("Hello".getBytes(StandardCharsets.UTF_16), 
RecordFieldType.STRING.getDataType(),null, StandardCharsets.UTF_16);
+        assertNotNull(s);
+        assertTrue(s instanceof String);
+        assertEquals("Conversion from byte[] to String failed", "Hello", s);
+    }
+
+    @Test
+    public void testBytesToBytes() {
+        Object b = 
DataTypeUtils.convertType("Hello".getBytes(StandardCharsets.UTF_16), 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()),null,
 StandardCharsets.UTF_16);
+        assertNotNull(b);
+        assertTrue(b instanceof Byte[]);
+        assertEquals("Conversion from byte[] to String failed at char 0", 
(Object) "Hello".getBytes(StandardCharsets.UTF_16)[0], ((Byte[]) b)[0]);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-docs/src/main/asciidoc/record-path-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc 
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index f011c52..8de98ee 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -502,6 +502,63 @@ The following record path would parse the eventDate field 
into a Date:
 
 `toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")`
 
+=== toString
+
+Converts a value to a String, using the given character set if the input type 
is "bytes".  For example,
+given a schema such as:
+
+----
+{
+  "type": "record",
+  "name": "events",
+  "fields": [
+    { "name": "name", "type": "string" },
+    { "name": "bytes", "type" : "bytes"}
+  ]
+}
+----
+
+and a record such as:
+
+----
+{
+  "name" : "My Event",
+  "bytes" : "Hello World!"
+}
+----
+
+The following record path would parse the bytes field into a String:
+
+`toString( /bytes, "UTF-8")`
+
+=== toBytes
+
+Converts a String to byte[], using the given character set.  For example, 
given a schema such as:
+
+----
+{
+  "type": "record",
+  "name": "events",
+  "fields": [
+    { "name": "name", "type": "string" },
+    { "name": "s", "type" : "string"}
+  ]
+}
+----
+
+and a record such as:
+
+----
+{
+  "name" : "My Event",
+  "s" : "Hello World!"
+}
+----
+
+The following record path would convert the String field into a byte array 
using UTF-16 encoding:
+
+`toBytes( /s, "UTF-16")`
+
 === format
 
 Converts a Date to a String in the given format.

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index a01e03d..411ca68 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -20,6 +20,8 @@ package org.apache.nifi.avro;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.Duration;
@@ -460,6 +462,10 @@ public class AvroTypeUtil {
     }
 
     public static GenericRecord createAvroRecord(final Record record, final 
Schema avroSchema) throws IOException {
+        return createAvroRecord(record, avroSchema, StandardCharsets.UTF_8);
+    }
+
+    public static GenericRecord createAvroRecord(final Record record, final 
Schema avroSchema, final Charset charset) throws IOException {
         final GenericRecord rec = new GenericData.Record(avroSchema);
         final RecordSchema recordSchema = record.getSchema();
 
@@ -473,7 +479,7 @@ public class AvroTypeUtil {
                 continue;
             }
 
-            final Object converted = convertToAvroObject(rawValue, 
field.schema(), fieldName);
+            final Object converted = convertToAvroObject(rawValue, 
field.schema(), fieldName, charset);
             rec.put(fieldName, converted);
         }
 
@@ -490,11 +496,19 @@ public class AvroTypeUtil {
     }
 
     /**
-     * Convert a raw value to an Avro object to serialize in Avro type system.
+     * Convert a raw value to an Avro object to serialize in Avro type system, 
using the provided character set when necessary.
      * The counter-part method which reads an Avro object back to a raw value 
is {@link #normalizeValue(Object, Schema, String)}.
      */
     public static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema) {
-        return convertToAvroObject(rawValue, fieldSchema, 
fieldSchema.getName());
+        return convertToAvroObject(rawValue, fieldSchema, 
StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Convert a raw value to an Avro object to serialize in Avro type system, 
using the provided character set when necessary.
+     * The counter-part method which reads an Avro object back to a raw value 
is {@link #normalizeValue(Object, Schema, String)}.
+     */
+    public static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema, final Charset charset) {
+        return convertToAvroObject(rawValue, fieldSchema, 
fieldSchema.getName(), charset);
     }
 
     /**
@@ -512,7 +526,7 @@ public class AvroTypeUtil {
             recordFields.add(new RecordField(fieldName, dataType, 
field.aliases(), nullable));
         } else {
             Object defaultValue = field.defaultVal();
-            if (fieldSchema.getType() == Schema.Type.ARRAY && 
!DataTypeUtils.isArrayTypeCompatible(defaultValue)) {
+            if (fieldSchema.getType() == Schema.Type.ARRAY && 
!DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) 
dataType).getElementType())) {
                 defaultValue = defaultValue instanceof List ? ((List<?>) 
defaultValue).toArray() : new Object[0];
             }
             recordFields.add(new RecordField(fieldName, dataType, 
defaultValue, field.aliases(), nullable));
@@ -526,7 +540,7 @@ public class AvroTypeUtil {
     }
 
     @SuppressWarnings("unchecked")
-    private static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema, final String fieldName) {
+    private static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema, final String fieldName, final Charset charset) {
         if (rawValue == null) {
             return null;
         }
@@ -609,6 +623,9 @@ public class AvroTypeUtil {
                 if (rawValue instanceof byte[]) {
                     return ByteBuffer.wrap((byte[]) rawValue);
                 }
+                if (rawValue instanceof String) {
+                    return ByteBuffer.wrap(((String) 
rawValue).getBytes(charset));
+                }
                 if (rawValue instanceof Object[]) {
                     return AvroTypeUtil.convertByteArray((Object[]) rawValue);
                 } else {
@@ -630,7 +647,7 @@ public class AvroTypeUtil {
                     final Map<String, Object> objectMap = (Map<String, 
Object>) rawValue;
                     final Map<String, Object> map = new 
HashMap<>(objectMap.size());
                     for (final String s : objectMap.keySet()) {
-                        final Object converted = 
convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + 
"[" + s + "]");
+                        final Object converted = 
convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + 
"[" + s + "]", charset);
                         map.put(s, converted);
                     }
                     return map;
@@ -650,18 +667,18 @@ public class AvroTypeUtil {
                         continue;
                     }
 
-                    final Object converted = 
convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + 
recordFieldName);
+                    final Object converted = 
convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + 
recordFieldName, charset);
                     avroRecord.put(recordFieldName, converted);
                 }
                 return avroRecord;
             case UNION:
-                return convertUnionFieldValue(rawValue, fieldSchema, schema -> 
convertToAvroObject(rawValue, schema, fieldName), fieldName);
+                return convertUnionFieldValue(rawValue, fieldSchema, schema -> 
convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
             case ARRAY:
                 final Object[] objectArray = (Object[]) rawValue;
                 final List<Object> list = new ArrayList<>(objectArray.length);
                 int i = 0;
                 for (final Object o : objectArray) {
-                    final Object converted = convertToAvroObject(o, 
fieldSchema.getElementType(), fieldName + "[" + i + "]");
+                    final Object converted = convertToAvroObject(o, 
fieldSchema.getElementType(), fieldName + "[" + i + "]", charset);
                     list.add(converted);
                     i++;
                 }
@@ -677,13 +694,17 @@ public class AvroTypeUtil {
             case ENUM:
                 return new GenericData.EnumSymbol(fieldSchema, rawValue);
             case STRING:
-                return DataTypeUtils.toString(rawValue, (String) null);
+                return DataTypeUtils.toString(rawValue, (String) null, 
charset);
         }
 
         return rawValue;
     }
 
     public static Map<String, Object> convertAvroRecordToMap(final 
GenericRecord avroRecord, final RecordSchema recordSchema) {
+        return convertAvroRecordToMap(avroRecord, recordSchema, 
StandardCharsets.UTF_8);
+    }
+
+    public static Map<String, Object> convertAvroRecordToMap(final 
GenericRecord avroRecord, final RecordSchema recordSchema, final Charset 
charset) {
         final Map<String, Object> values = new 
HashMap<>(recordSchema.getFieldCount());
 
         for (final RecordField recordField : recordSchema.getFields()) {
@@ -710,7 +731,7 @@ public class AvroTypeUtil {
             final Object rawValue = normalizeValue(value, fieldSchema, 
fieldName);
 
             final DataType desiredType = recordField.getDataType();
-            final Object coercedValue = DataTypeUtils.convertType(rawValue, 
desiredType, fieldName);
+            final Object coercedValue = DataTypeUtils.convertType(rawValue, 
desiredType, fieldName, charset);
 
             values.put(fieldName, coercedValue);
             } catch (Exception ex) {
@@ -779,7 +800,7 @@ public class AvroTypeUtil {
                 }
                 break;
             case ARRAY:
-                if (value instanceof Array || value instanceof List) {
+                if (value instanceof Array || value instanceof List || value 
instanceof ByteBuffer) {
                     return true;
                 }
                 break;
@@ -795,7 +816,7 @@ public class AvroTypeUtil {
 
     /**
      * Convert an Avro object to a normal Java objects for further processing.
-     * The counter-part method which convert a raw value to an Avro object is 
{@link #convertToAvroObject(Object, Schema, String)}
+     * The counter-part method which convert a raw value to an Avro object is 
{@link #convertToAvroObject(Object, Schema, String, Charset)}
      */
     private static Object normalizeValue(final Object value, final Schema 
avroSchema, final String fieldName) {
         if (value == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index bf7d255..e4e515b 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -325,7 +327,7 @@ public class TestAvroTypeUtil {
         try (DataFileStream<GenericRecord> r = new 
DataFileStream<>(getClass().getResourceAsStream("data.avro"),
                 new GenericDatumReader<>())) {
             GenericRecord n = r.next();
-            AvroTypeUtil.convertAvroRecordToMap(n, recordASchema);
+            AvroTypeUtil.convertAvroRecordToMap(n, recordASchema, 
StandardCharsets.UTF_8);
         }
     }
 
@@ -372,7 +374,7 @@ public class TestAvroTypeUtil {
         expects.forEach((rawValue, expect) -> {
             final Object convertedValue;
             try {
-                convertedValue = AvroTypeUtil.convertToAvroObject(rawValue, 
fieldSchema);
+                convertedValue = AvroTypeUtil.convertToAvroObject(rawValue, 
fieldSchema, StandardCharsets.UTF_8);
             } catch (Exception e) {
                 if (expect.equals(e.getClass().getCanonicalName())) {
                     // Expected behavior.
@@ -394,4 +396,25 @@ public class TestAvroTypeUtil {
 
     }
 
+    @Test
+    public void testStringToBytesConversion() {
+        Object o = AvroTypeUtil.convertToAvroObject("Hello", 
Schema.create(Type.BYTES), StandardCharsets.UTF_16);
+        assertTrue(o instanceof ByteBuffer);
+        assertEquals("Hello", new String(((ByteBuffer) o).array(), 
StandardCharsets.UTF_16));
+    }
+
+    @Test
+    public void testStringToNullableBytesConversion() {
+        Object o = AvroTypeUtil.convertToAvroObject("Hello", 
Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)), 
StandardCharsets.UTF_16);
+        assertTrue(o instanceof ByteBuffer);
+        assertEquals("Hello", new String(((ByteBuffer) o).array(), 
StandardCharsets.UTF_16));
+    }
+
+    @Test
+    public void testBytesToStringConversion() {
+        final Charset charset = Charset.forName("UTF_32LE");
+        Object o = AvroTypeUtil.convertToAvroObject("Hello".getBytes(charset), 
Schema.create(Type.STRING), charset);
+        assertTrue(o instanceof String);
+        assertEquals("Hello", o);
+    }
 }

Reply via email to