Shawn,

We had the same issue, and use special and multi character delimiters here. I 
have not been able to find a CSV library that supports multi-character 
delimiters, otherwise I would have updated the CSV Record Reader to support it. 
I created a special Record Reader that supports multi-character delimiters. We 
use this in Convert Record to convert to a different format as soon as possible 
😊.  I don’t know if your up for using custom code… But just in case you are, 
here is my personal implementation that we use in house.

Thanks,
  Peter

--Class 1--

public class CSVReader extends AbstractControllerService implements 
RecordReaderFactory {
    static final PropertyDescriptor COLUMN_DELIMITER = new 
PropertyDescriptor.Builder()
            .name("pt-column-delimiter")
            .displayName("Column Delimiter")
            .description("The character(s) to use to separate columns of data. 
Special characters like metacharacter should use the '\\\\' notation. If not 
specified Ctrl+A delimiter is used.")
            .required(false)
            .defaultValue("\\u0001")
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    static final PropertyDescriptor RECORD_DELIMITER = new 
PropertyDescriptor.Builder()
            .name("pt-record-delimiter")
            .displayName("Record Delimiter")
            .description("The character(s) to use to separate rows of data. For 
line return press 'Shift+Enter' in this field. Special characters should use 
the '\\u' notation.")
            .required(false)
            .defaultValue("\n")
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    static final PropertyDescriptor SKIP_HEADER_ROW = new 
PropertyDescriptor.Builder()
            .name("pt-skip-header")
            .displayName("Skip First Row")
            .description("Specifies whether or not the first row of data will 
be skipped.")
            .allowableValues("true", "false")
            .defaultValue("true")
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    private volatile String colDelimiter;
    private volatile String recDelimiter;
   private volatile boolean skipHeader;

    @OnEnabled
    public void storeCsvFormat(final ConfigurationContext context) {
        this.colDelimiter = 
StringEscapeUtils.unescapeJava(context.getProperty(COLUMN_DELIMITER).getValue());
        this.recDelimiter = 
StringEscapeUtils.unescapeJava(context.getProperty(RECORD_DELIMITER).getValue());
        this.skipHeader = context.getProperty(SKIP_HEADER_ROW).asBoolean();
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
        propertyDescriptors.add(COLUMN_DELIMITER);
        propertyDescriptors.add(RECORD_DELIMITER);
        propertyDescriptors.add(SKIP_HEADER_ROW);

        return propertyDescriptors;
    }

    @Override
    public RecordReader createRecordReader(Map<String, String> map, InputStream 
inputStream, ComponentLog componentLog) throws MalformedRecordException, 
IOException, SchemaNotFoundException {
        return new CSVRecordReader(inputStream, componentLog, this.skipHeader, 
this.colDelimiter, this.recDelimiter);
    }
}


--- Class 2 ---

public class CSVRecordReader implements RecordReader {
    private final PeekableScanner s;
    private final RecordSchema schema;
    private final String colDelimiter;
    private final String recordDelimiter;

    public CSVRecordReader(final InputStream in, final ComponentLog logger, 
final boolean hasHeader, final String colDelimiter, final String 
recordDelimiter) throws IOException {
        this.recordDelimiter = recordDelimiter;
        this.colDelimiter = colDelimiter;

        s = new PeekableScanner(new Scanner(in, 
"UTF-8").useDelimiter(recordDelimiter));
        //Build a basic schema based on row count
        final String forRowCount = s.peek();
        final List<RecordField> fields = new ArrayList<>();

        if (forRowCount != null) {
            final String[] columns = forRowCount.split(colDelimiter, -1);
            for (int nColumnIndex = 0; nColumnIndex < columns.length; 
nColumnIndex++) {
                fields.add(new RecordField("Column_" + 
String.valueOf(nColumnIndex), RecordFieldType.STRING.getDataType(), true));
            }

            schema = new SimpleRecordSchema(fields);
        } else {
            schema = null;
        }

        //Skip the header line, if there is one
        if (hasHeader && s.hasNext()) s.next();
    }

    @Override
    public Record nextRecord(boolean b, boolean b1) throws IOException, 
MalformedRecordException {
        if(!s.hasNext()) return null;

        final String row = s.next();
        final List<RecordField> recordFields = getSchema().getFields();

        final Map<String, Object> values = new 
LinkedHashMap<>(recordFields.size() * 2);
        final String[] columns = row.split(colDelimiter, -1);

        for (int i = 0; i < columns.length; i++) {
            final RecordField recordField = recordFields.get(i);
            final String rawFieldName = recordField.getFieldName();
            final String rawValue = columns[i];

            values.put(rawFieldName, rawValue);
        }

        return new MapRecord(schema, values);
    }

    @Override
    public RecordSchema getSchema() throws MalformedRecordException {
        return schema;
    }

    @Override
    public void close() throws IOException {
        s.close();
    }

    private class PeekableScanner
    {
        private Scanner scan;
        private String next;

        public PeekableScanner( Scanner scan )
        {
            this.scan = scan;
            next = (scan.hasNext() ? scan.next() : null);
        }

        public boolean hasNext()
        {
            return (next != null);
        }

        public String next()
        {
            String current = next;
            next = (scan.hasNext() ? scan.next() : null);
            return current;
        }

        public String peek()
        {
            return next;
        }

        public void close(){
            scan.close();
        }
    }
}


From: Andy LoPresto <alopre...@apache.org>
Sent: Wednesday, November 6, 2019 4:40 PM
To: users@nifi.apache.org
Subject: [EXT] Re: How to replace multi character delimiter with ASCII 001

I haven’t tried this, but you might be able to use ${"AQ==“:base64Decode()} as 
AQ== is the Base64 encoded \u0001 ?

Andy LoPresto
alopre...@apache.org<mailto:alopre...@apache.org>
alopresto.apa...@gmail.com<mailto:alopresto.apa...@gmail.com>
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69


On Nov 6, 2019, at 12:25 PM, Shawn Weeks 
<swe...@weeksconsulting.us<mailto:swe...@weeksconsulting.us>> wrote:

I'm trying to process a delimited file with a multi character delimiter which 
is not supported by the CSV Record Reader. To get around that I'm trying to 
replace the delimiter with ASCII 001 the same delimiter used by Hive and one I 
know isn't in the data. Here is my current configuration but NiFi isn't 
interpreting \u0001. I've also tried \001 and ${literal('\u0001')}. None of 
which worked. What is the correct way to do this?

Thanks
Shawn Weeks

<Outlook-asqiibgt.png>

Reply via email to