Hi everydody.

I have a text file that I convert with Linux iconv to UTF-8 that is delimited 
by a semicolon, which I submit to an ExecuteScript with the code attached, has 
one of the following error:

Attribute ValuesShow modified attributes only
avro.inferred.schema.errors
type: <type 'java.lang.IllegalArgumentException'> value: 
java.lang.IllegalArgumentException: Can not create PyString with non-byte value 
traceback: <traceback object at 0x17a81>
No value set

Can someone help me?

Thank you very much in advance.



Márcio Arruda

Rio de Janeiro - Brazil
import sys, traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, 
OutputStreamCallback, StreamCallback
from org.python.core.util import StringUtil
from java.io import BufferedReader, InputStreamReader

import re, csv, json
import StringIO
#from enum import IntEnum
#from enum import Enum

_INPUT_DECIMAL_SEPARATOR_ = "\."
_TARGET_DECIMAL_SEPARATOR_ = "."

_LONG_PATTERN_ = "^\d+$"
_DOUBLE_PATTERN_ = "^(\d+)" + _INPUT_DECIMAL_SEPARATOR_ + "(\d*[dD]?)$"
_FLOAT_PATTERN_ = "^(\d+)" + _INPUT_DECIMAL_SEPARATOR_ + "(\d*[fF]?)$"

_FIELD_TYPE_NONE   = 0
_FIELD_TYPE_LONG   = 10
_FIELD_TYPE_DOUBLE = 20
_FIELD_TYPE_FLOAT  = 30
_FIELD_TYPE_STRING = 90

_FIELD_NULL_NULL     = 'null'
_FIELD_NULL_NOT_NULL = 'not null'

def fieldTypeName(t):
    if t == _FIELD_TYPE_LONG:
        return "long"
    elif t == _FIELD_TYPE_DOUBLE:
        return "double"
    elif t == _FIELD_TYPE_FLOAT:
        return "float"
    else:
        return "string"
    
class SchemaBuilder:
    def getSchema(self, inputStream, record_name):
        reader = csv.reader(inputStream, delimiter=';', quotechar='"')
        names = [h.lower() for h in reader.next()]
        nulls = [_FIELD_NULL_NOT_NULL] * len(names)
        types = [_FIELD_TYPE_NONE] * len(names)

        for row in reader:
            for i in range(0, len(row)):
                cell = row[i].strip()
                #check null
                if cell == "":
                    nulls[i] = _FIELD_NULL_NULL
                else:
                    #check type
                    type = 0
                    if re.match(_LONG_PATTERN_, cell): #long
                        type = _FIELD_TYPE_LONG
                    elif re.match(_DOUBLE_PATTERN_, cell): #double
                        type = _FIELD_TYPE_DOUBLE
                    elif re.match(_FLOAT_PATTERN_, cell): #float
                        type = _FIELD_TYPE_FLOAT
                    else: #string
                        type = _FIELD_TYPE_STRING

                if type > types[i]:
                    types[i] = type

        fields = []
        for i in range(0, len(names)):
            # if no type inferred mark as string
            if types[i] == _FIELD_TYPE_NONE:
                types[i] = _FIELD_TYPE_STRING
                
            field = {}
            field["name"] = names[i]
            field["type"] = [nulls[i], fieldTypeName(types[i])] if nulls[i] == 
_FIELD_NULL_NULL else fieldTypeName(types[i])
            fields.append(field)

        data = {}
        data["fields"] = fields
        data["type"] = "record"
        data["name"] = record_name
        
        
        return json.dumps(data)

class SchemaFix:
    def getType(self, types):
        if isinstance(types, (str, unicode)):
            return types
        else:
            return types[1]
            
    def fixStream(self, inputStream, outputStream, schema):
        schema = json.loads(schema)
        
        
        writer = csv.writer(outputStream, delimiter=';', quotechar='"')
                            
        reader = csv.reader(inputStream, delimiter=';', quotechar='"')
        names = [h.lower() for h in reader.next()]
        types = [""] * len(names)

        writer.writerow(names)

        for field in schema["fields"]:
            name = field["name"]
            i = names.index(name)
            types[i] = self.getType(field["type"])
            
        for row in reader:

            out_row = row
            
            for i in range(0, len(row)):
                t = types[i]
                cell = row[i]
                if t == "double":
                    cell = re.sub(_DOUBLE_PATTERN_, "\\1" + 
_TARGET_DECIMAL_SEPARATOR_ + "\\2", cell)
                    print(cell)
                elif t == "float":
                    cell = re.sub(_FLOAT_PATTERN_, "\\1" + 
_TARGET_DECIMAL_SEPARATOR_ + "\\2", cell)

                if cell.strip() == "":
                    cell = ""

                out_row[i] = cell

            writer.writerow(out_row)
            
    def getFixAsString(self, inputStream, schema):
        outputStream = StringIO.StringIO()
        self.fixStream(inputStream, outputStream, schema)
        result = outputStream.getvalue()
        outputStream.close()

        return result

class BufferedReaderIterator:
    def __init__(self, bufferedReader):
        self.bufferedReader = bufferedReader

    def __iter__(self):
        return self

    def next(self):
        line = self.bufferedReader.readLine()
        if line:            
            return line
        else:
            raise StopIteration()
        
class InferSchemaCallback(InputStreamCallback):
    schema = None
    record_name = None
    success = False
    errors = []
        
    def process(self, inputStream):
        reader = InputStreamReader(inputStream, 'UTF-8')
        bufferedReader = BufferedReader(reader)

        try:
            self.schema = 
SchemaBuilder().getSchema(BufferedReaderIterator(bufferedReader), 
self.record_name)
            self.success = True
        except:
            self.errors.append(sys.exc_info())
        
        bufferedReader.close()
        reader.close()        

class SchemaFixCallback(StreamCallback):
    schema = None
    
    def __init__(self):
        pass
    
    def process(self, inputStream, outputStream):
        reader = InputStreamReader(inputStream)
        bufferedReader = BufferedReader(reader)
                
        SchemaFix().fixStream(BufferedReaderIterator(bufferedReader), 
outputStream, self.schema)
        #outputStream.write(bytearray(output_text.encode('utf-8')))

        bufferedReader.close()
        reader.close()
    
flowFile = session.get() 
if (flowFile != None):
    isc = InferSchemaCallback()
    isc.record_name = "enad"
    session.read(flowFile, isc)
    if isc.success:
        session.putAttribute(flowFile, 'avro.inferred.schema', isc.schema)

        sfc = SchemaFixCallback()
        sfc.schema = isc.schema
        flowFile = session.write(flowFile, sfc)
        
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    else:
        session.putAttribute(flowFile, 'avro.inferred.schema.errors', 
";###;".join(['type: %s \n value: %s \n traceback: %s' % (f[0], f[1], f[2]) for 
f in isc.errors]))
        session.transfer(flowFile, REL_FAILURE)
        session.commit()





  • Question Márcio Alexandre Henrique de Arruda

Reply via email to