Hi everydody. I have a text file that I convert with Linux iconv to UTF-8 that is delimited by a semicolon, which I submit to an ExecuteScript with the code attached, has one of the following error:
Attribute ValuesShow modified attributes only avro.inferred.schema.errors type: <type 'java.lang.IllegalArgumentException'> value: java.lang.IllegalArgumentException: Can not create PyString with non-byte value traceback: <traceback object at 0x17a81> No value set Can someone help me? Thank you very much in advance. Márcio Arruda Rio de Janeiro - Brazil
import sys, traceback from java.nio.charset import StandardCharsets from org.apache.commons.io import IOUtils from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback, StreamCallback from org.python.core.util import StringUtil from java.io import BufferedReader, InputStreamReader import re, csv, json import StringIO #from enum import IntEnum #from enum import Enum _INPUT_DECIMAL_SEPARATOR_ = "\." _TARGET_DECIMAL_SEPARATOR_ = "." _LONG_PATTERN_ = "^\d+$" _DOUBLE_PATTERN_ = "^(\d+)" + _INPUT_DECIMAL_SEPARATOR_ + "(\d*[dD]?)$" _FLOAT_PATTERN_ = "^(\d+)" + _INPUT_DECIMAL_SEPARATOR_ + "(\d*[fF]?)$" _FIELD_TYPE_NONE = 0 _FIELD_TYPE_LONG = 10 _FIELD_TYPE_DOUBLE = 20 _FIELD_TYPE_FLOAT = 30 _FIELD_TYPE_STRING = 90 _FIELD_NULL_NULL = 'null' _FIELD_NULL_NOT_NULL = 'not null' def fieldTypeName(t): if t == _FIELD_TYPE_LONG: return "long" elif t == _FIELD_TYPE_DOUBLE: return "double" elif t == _FIELD_TYPE_FLOAT: return "float" else: return "string" class SchemaBuilder: def getSchema(self, inputStream, record_name): reader = csv.reader(inputStream, delimiter=';', quotechar='"') names = [h.lower() for h in reader.next()] nulls = [_FIELD_NULL_NOT_NULL] * len(names) types = [_FIELD_TYPE_NONE] * len(names) for row in reader: for i in range(0, len(row)): cell = row[i].strip() #check null if cell == "": nulls[i] = _FIELD_NULL_NULL else: #check type type = 0 if re.match(_LONG_PATTERN_, cell): #long type = _FIELD_TYPE_LONG elif re.match(_DOUBLE_PATTERN_, cell): #double type = _FIELD_TYPE_DOUBLE elif re.match(_FLOAT_PATTERN_, cell): #float type = _FIELD_TYPE_FLOAT else: #string type = _FIELD_TYPE_STRING if type > types[i]: types[i] = type fields = [] for i in range(0, len(names)): # if no type inferred mark as string if types[i] == _FIELD_TYPE_NONE: types[i] = _FIELD_TYPE_STRING field = {} field["name"] = names[i] field["type"] = [nulls[i], fieldTypeName(types[i])] if nulls[i] == _FIELD_NULL_NULL else fieldTypeName(types[i]) fields.append(field) data = {} data["fields"] = fields data["type"] = "record" data["name"] = record_name return json.dumps(data) class SchemaFix: def getType(self, types): if isinstance(types, (str, unicode)): return types else: return types[1] def fixStream(self, inputStream, outputStream, schema): schema = json.loads(schema) writer = csv.writer(outputStream, delimiter=';', quotechar='"') reader = csv.reader(inputStream, delimiter=';', quotechar='"') names = [h.lower() for h in reader.next()] types = [""] * len(names) writer.writerow(names) for field in schema["fields"]: name = field["name"] i = names.index(name) types[i] = self.getType(field["type"]) for row in reader: out_row = row for i in range(0, len(row)): t = types[i] cell = row[i] if t == "double": cell = re.sub(_DOUBLE_PATTERN_, "\\1" + _TARGET_DECIMAL_SEPARATOR_ + "\\2", cell) print(cell) elif t == "float": cell = re.sub(_FLOAT_PATTERN_, "\\1" + _TARGET_DECIMAL_SEPARATOR_ + "\\2", cell) if cell.strip() == "": cell = "" out_row[i] = cell writer.writerow(out_row) def getFixAsString(self, inputStream, schema): outputStream = StringIO.StringIO() self.fixStream(inputStream, outputStream, schema) result = outputStream.getvalue() outputStream.close() return result class BufferedReaderIterator: def __init__(self, bufferedReader): self.bufferedReader = bufferedReader def __iter__(self): return self def next(self): line = self.bufferedReader.readLine() if line: return line else: raise StopIteration() class InferSchemaCallback(InputStreamCallback): schema = None record_name = None success = False errors = [] def process(self, inputStream): reader = InputStreamReader(inputStream, 'UTF-8') bufferedReader = BufferedReader(reader) try: self.schema = SchemaBuilder().getSchema(BufferedReaderIterator(bufferedReader), self.record_name) self.success = True except: self.errors.append(sys.exc_info()) bufferedReader.close() reader.close() class SchemaFixCallback(StreamCallback): schema = None def __init__(self): pass def process(self, inputStream, outputStream): reader = InputStreamReader(inputStream) bufferedReader = BufferedReader(reader) SchemaFix().fixStream(BufferedReaderIterator(bufferedReader), outputStream, self.schema) #outputStream.write(bytearray(output_text.encode('utf-8'))) bufferedReader.close() reader.close() flowFile = session.get() if (flowFile != None): isc = InferSchemaCallback() isc.record_name = "enad" session.read(flowFile, isc) if isc.success: session.putAttribute(flowFile, 'avro.inferred.schema', isc.schema) sfc = SchemaFixCallback() sfc.schema = isc.schema flowFile = session.write(flowFile, sfc) session.transfer(flowFile, REL_SUCCESS) session.commit() else: session.putAttribute(flowFile, 'avro.inferred.schema.errors', ";###;".join(['type: %s \n value: %s \n traceback: %s' % (f[0], f[1], f[2]) for f in isc.errors])) session.transfer(flowFile, REL_FAILURE) session.commit()