Hi everydody.
I have a text file that I convert with Linux iconv to UTF-8 that is delimited
by a semicolon, which I submit to an ExecuteScript with the code attached, has
one of the following error:
Attribute ValuesShow modified attributes only
avro.inferred.schema.errors
type: <type 'java.lang.IllegalArgumentException'> value:
java.lang.IllegalArgumentException: Can not create PyString with non-byte value
traceback: <traceback object at 0x17a81>
No value set
Can someone help me?
Thank you very much in advance.
Márcio Arruda
Rio de Janeiro - Brazil
import sys, traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback,
OutputStreamCallback, StreamCallback
from org.python.core.util import StringUtil
from java.io import BufferedReader, InputStreamReader
import re, csv, json
import StringIO
#from enum import IntEnum
#from enum import Enum
_INPUT_DECIMAL_SEPARATOR_ = "\."
_TARGET_DECIMAL_SEPARATOR_ = "."
_LONG_PATTERN_ = "^\d+$"
_DOUBLE_PATTERN_ = "^(\d+)" + _INPUT_DECIMAL_SEPARATOR_ + "(\d*[dD]?)$"
_FLOAT_PATTERN_ = "^(\d+)" + _INPUT_DECIMAL_SEPARATOR_ + "(\d*[fF]?)$"
_FIELD_TYPE_NONE = 0
_FIELD_TYPE_LONG = 10
_FIELD_TYPE_DOUBLE = 20
_FIELD_TYPE_FLOAT = 30
_FIELD_TYPE_STRING = 90
_FIELD_NULL_NULL = 'null'
_FIELD_NULL_NOT_NULL = 'not null'
def fieldTypeName(t):
if t == _FIELD_TYPE_LONG:
return "long"
elif t == _FIELD_TYPE_DOUBLE:
return "double"
elif t == _FIELD_TYPE_FLOAT:
return "float"
else:
return "string"
class SchemaBuilder:
def getSchema(self, inputStream, record_name):
reader = csv.reader(inputStream, delimiter=';', quotechar='"')
names = [h.lower() for h in reader.next()]
nulls = [_FIELD_NULL_NOT_NULL] * len(names)
types = [_FIELD_TYPE_NONE] * len(names)
for row in reader:
for i in range(0, len(row)):
cell = row[i].strip()
#check null
if cell == "":
nulls[i] = _FIELD_NULL_NULL
else:
#check type
type = 0
if re.match(_LONG_PATTERN_, cell): #long
type = _FIELD_TYPE_LONG
elif re.match(_DOUBLE_PATTERN_, cell): #double
type = _FIELD_TYPE_DOUBLE
elif re.match(_FLOAT_PATTERN_, cell): #float
type = _FIELD_TYPE_FLOAT
else: #string
type = _FIELD_TYPE_STRING
if type > types[i]:
types[i] = type
fields = []
for i in range(0, len(names)):
# if no type inferred mark as string
if types[i] == _FIELD_TYPE_NONE:
types[i] = _FIELD_TYPE_STRING
field = {}
field["name"] = names[i]
field["type"] = [nulls[i], fieldTypeName(types[i])] if nulls[i] ==
_FIELD_NULL_NULL else fieldTypeName(types[i])
fields.append(field)
data = {}
data["fields"] = fields
data["type"] = "record"
data["name"] = record_name
return json.dumps(data)
class SchemaFix:
def getType(self, types):
if isinstance(types, (str, unicode)):
return types
else:
return types[1]
def fixStream(self, inputStream, outputStream, schema):
schema = json.loads(schema)
writer = csv.writer(outputStream, delimiter=';', quotechar='"')
reader = csv.reader(inputStream, delimiter=';', quotechar='"')
names = [h.lower() for h in reader.next()]
types = [""] * len(names)
writer.writerow(names)
for field in schema["fields"]:
name = field["name"]
i = names.index(name)
types[i] = self.getType(field["type"])
for row in reader:
out_row = row
for i in range(0, len(row)):
t = types[i]
cell = row[i]
if t == "double":
cell = re.sub(_DOUBLE_PATTERN_, "\\1" +
_TARGET_DECIMAL_SEPARATOR_ + "\\2", cell)
print(cell)
elif t == "float":
cell = re.sub(_FLOAT_PATTERN_, "\\1" +
_TARGET_DECIMAL_SEPARATOR_ + "\\2", cell)
if cell.strip() == "":
cell = ""
out_row[i] = cell
writer.writerow(out_row)
def getFixAsString(self, inputStream, schema):
outputStream = StringIO.StringIO()
self.fixStream(inputStream, outputStream, schema)
result = outputStream.getvalue()
outputStream.close()
return result
class BufferedReaderIterator:
def __init__(self, bufferedReader):
self.bufferedReader = bufferedReader
def __iter__(self):
return self
def next(self):
line = self.bufferedReader.readLine()
if line:
return line
else:
raise StopIteration()
class InferSchemaCallback(InputStreamCallback):
schema = None
record_name = None
success = False
errors = []
def process(self, inputStream):
reader = InputStreamReader(inputStream, 'UTF-8')
bufferedReader = BufferedReader(reader)
try:
self.schema =
SchemaBuilder().getSchema(BufferedReaderIterator(bufferedReader),
self.record_name)
self.success = True
except:
self.errors.append(sys.exc_info())
bufferedReader.close()
reader.close()
class SchemaFixCallback(StreamCallback):
schema = None
def __init__(self):
pass
def process(self, inputStream, outputStream):
reader = InputStreamReader(inputStream)
bufferedReader = BufferedReader(reader)
SchemaFix().fixStream(BufferedReaderIterator(bufferedReader),
outputStream, self.schema)
#outputStream.write(bytearray(output_text.encode('utf-8')))
bufferedReader.close()
reader.close()
flowFile = session.get()
if (flowFile != None):
isc = InferSchemaCallback()
isc.record_name = "enad"
session.read(flowFile, isc)
if isc.success:
session.putAttribute(flowFile, 'avro.inferred.schema', isc.schema)
sfc = SchemaFixCallback()
sfc.schema = isc.schema
flowFile = session.write(flowFile, sfc)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
else:
session.putAttribute(flowFile, 'avro.inferred.schema.errors',
";###;".join(['type: %s \n value: %s \n traceback: %s' % (f[0], f[1], f[2]) for
f in isc.errors]))
session.transfer(flowFile, REL_FAILURE)
session.commit()