Hi everybody,
As I'm not a java programmer, the new about Python becoming a first
class citizen in NIFI 2 was a great thing.
I am creating processors, but there is an issue I can not figure out
about controlling an Exception in python code and therefore sending the
flowfile to 'failure' way.
Specifically when I catch an exception it does send the ff to failure,
but completely ignores the paramaters I set for content (string) and
attributes(dict) and all the time just sends the original content plus
the standard attributes.
I have tried:
- Creating FlowFileTransform instance and then trying to modify its
properties to finally use it in the return statement
- Returning a FlowFileTransform just at the point of the return, having
several return points according to code conditions.
- Creating a variable (dict), that changes during the transformation
lines of code, and finally assigning the variable items to
FlowFileTransform new instance (flowfile, contents, attributes).
But nothing, doesn't work, goes to 'failure' but ignores the content I
try to put.
I am using:
- Java 21.0.2
- python 3.10.12
- nifi 2.0.0-M3
I am attaching a reduced version that recreates the behavior.
Can anyone give me tip about this or maybe I missed something on
documentation, or any idea, what could be happening ?
Thanks in advance
LC
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators
from nifiapi.documentation import use_case, multi_processor_use_case, ProcessorConfiguration
import json
from pydantic import BaseModel, ValidationError
@use_case(
description="Create request to get Access Token from Authorization System",
notes="The input for this use case is expected to be a FlowFile whose content is \
a JSON document, indicating user's id & secret",
keywords=["auth", "bs"],
configuration="""
"""
)
class BS_UserToken(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.7.a'
description = """Create User Authorization Token"""
tags = ["python", "bs", "auth", "zitadel"]
class UserId(BaseModel):
client_id: str
client_secret: str
PROCESSOR_NAME = 'BS_API_UserToken'
PROCESSOR_VERSION = '0.7.a'
LOG_PREFIX_BASE = f'\n{PROCESSOR_NAME}-v{PROCESSOR_VERSION}'
def __init__(self, jvm=None, **kwargs):
self.jvm = jvm
""" ########################### PROPERTIES START ############################ """
self.authServerDomain = PropertyDescriptor(
name="Auth Server Url",
description="The Auth server base URL",
required=True,
# default_value="http://localhost:8010",
# allowable_values=["http://localhost:8010"],
# dependencies=[]
)
self.authServerTokenUri = PropertyDescriptor(
name="Auth Server Token-Endpoint Uri",
description="The Auth Server token endpoint URI",
required=True,
# default_value="/token",
# allowable_values=["/token"],
# dependencies=[]
)
self.projectId = PropertyDescriptor(
name="Project Id",
description="Identification of service to ask for access",
required=True,
# default_value="",
# allowable_values=[],
# dependencies=[]
)
self.property_descriptors = [self.authServerDomain,
self.authServerTokenUri,
self.projectId]
""" ########################### PROPERTIES END ############################ """
def onScheduled(self, context):
try:
# import pydevd_pycharm
# pydevd_pycharm.settrace('localhost', port=5678, stdoutToServer=True, stderrToServer=True)
self.LOG_PREFIX_BASE = f'{self.LOG_PREFIX_BASE}-[{self.__dict__["identifier"]}]'
self.logger.info(f'\n{self.LOG_PREFIX_BASE} Debugger Not Enabled\n')
except Exception as e:
self.logger.error(f'\n{self.LOG_PREFIX_BASE} Failed to connect to python debug listener: {e}\n')
def getPropertyDescriptors(self):
return self.property_descriptors
def transform(self, context, flowfile) -> FlowFileTransformResult:
LOG_PREFIX = f'{self.LOG_PREFIX_BASE} FF:{flowfile.getAttribute("filename")} >>'
self.logger.info(f'\n\n\n{LOG_PREFIX} START Processing ...')
if flowfile.getSize():
ff_content = flowfile.getContentsAsBytes().decode()
try:
ff_content_dict = json.loads(ff_content)
self.logger.info(f'{LOG_PREFIX} ff_content: {ff_content}')
# self.UserId( **ff_content_dict )
authServerUrl = context.getProperty(self.authServerDomain).getValue()
authServerTokenUri = context.getProperty(self.authServerTokenUri).getValue()
projectId = context.getProperty(self.projectId).getValue()
self.logger.info(
f'{LOG_PREFIX} AuthServer={authServerUrl}, TokenEndpoint={authServerTokenUri}, ProjectId={projectId}')
client_credentials = f"{ff_content_dict['client_id']}:{ff_content['client_secret']}".encode("utf-8")
return FlowFileTransformResult(relationship='success', contents='{"error": "sin error"}', attributes=dict(_rest_status="500", _op_status="-1"))
except ValidationError as vex:
self.logger.error(f'{LOG_PREFIX} End Processing w/ERROR - Not Processable(406) - {str(vex.errors())}')
return FlowFileTransformResult(relationship='failure', attributes=dict(_rest_status="406", _op_status="0"))
except Exception as ex:
x = FlowFileTransformResult(relationship='failure', attributes=dict(rest_status='Mil'))
attrs = x.getAttributes()
self.logger.info(f'{LOG_PREFIX} Failure with: Params={str(attrs)} Relationship={x.getRelationship()} Contents={x.getContents()}')
self.logger.error(f'{LOG_PREFIX} End Processing w/ERROR - Not Processable(406) - {str(ex)} \n\n\n')
return x
else:
self.logger.error(f'{LOG_PREFIX} END Processing w/ERROR - Empty data(400)')
return FlowFileTransformResult(relationship='failure', contents='Bad Request', attributes=dict(_rest_status="400", _op_status="0"))