Hi everybody,

As I'm not a java programmer, the new about Python becoming a first class citizen in NIFI 2 was a great thing.

I am creating processors, but there is an issue I can not figure out about controlling an Exception in python code and therefore sending the flowfile to 'failure' way.

Specifically when I catch an exception it does send the ff to failure, but completely ignores the paramaters I set for content (string)  and attributes(dict) and all the time just sends the original content plus the standard attributes.

I have tried:

- Creating FlowFileTransform instance and then trying to modify its properties to finally use it in the return statement

- Returning a FlowFileTransform just at the point of the return, having several return points according to code conditions.

- Creating a variable (dict), that changes during the transformation lines of code, and finally assigning the variable items to FlowFileTransform new instance (flowfile, contents, attributes).

But nothing, doesn't work, goes to 'failure' but ignores the content I try to put.


I am using:

- Java 21.0.2

- python 3.10.12

- nifi 2.0.0-M3


I am attaching a reduced version that recreates the behavior.


Can anyone give me tip about this or maybe I missed something on documentation, or any idea, what could be happening ?


Thanks in advance

LC

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators
from nifiapi.documentation import use_case, multi_processor_use_case, ProcessorConfiguration

import json
from pydantic import BaseModel, ValidationError


@use_case(
    description="Create request to get Access Token from Authorization System",
    notes="The input for this use case is expected to be a FlowFile whose content is \
           a JSON document, indicating user's id & secret",
    keywords=["auth", "bs"],
    configuration="""
        """
)

class BS_UserToken(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.7.a'
        description = """Create User Authorization Token"""
        tags = ["python", "bs", "auth", "zitadel"]

    class UserId(BaseModel):
        client_id: str
        client_secret: str

    PROCESSOR_NAME = 'BS_API_UserToken'
    PROCESSOR_VERSION = '0.7.a'
    LOG_PREFIX_BASE = f'\n{PROCESSOR_NAME}-v{PROCESSOR_VERSION}'

    def __init__(self, jvm=None, **kwargs):
        self.jvm = jvm
        """ ########################### PROPERTIES START ############################ """
        self.authServerDomain = PropertyDescriptor(
            name="Auth Server Url",
            description="The Auth server base URL",
            required=True,
            # default_value="http://localhost:8010";,
            # allowable_values=["http://localhost:8010";],
            # dependencies=[]
        )
        self.authServerTokenUri = PropertyDescriptor(
            name="Auth Server Token-Endpoint Uri",
            description="The Auth Server token endpoint URI",
            required=True,
            # default_value="/token",
            # allowable_values=["/token"],
            # dependencies=[]
        )
        self.projectId = PropertyDescriptor(
            name="Project Id",
            description="Identification of service to ask for access",
            required=True,
            # default_value="",
            # allowable_values=[],
            # dependencies=[]
        )
        self.property_descriptors = [self.authServerDomain,
                                     self.authServerTokenUri,
                                     self.projectId]
        """ ########################### PROPERTIES END ############################ """

    def onScheduled(self, context):
        try:
           # import pydevd_pycharm
           # pydevd_pycharm.settrace('localhost', port=5678, stdoutToServer=True, stderrToServer=True)
           self.LOG_PREFIX_BASE = f'{self.LOG_PREFIX_BASE}-[{self.__dict__["identifier"]}]'
           self.logger.info(f'\n{self.LOG_PREFIX_BASE} Debugger Not Enabled\n')
        except Exception as e:
            self.logger.error(f'\n{self.LOG_PREFIX_BASE} Failed to connect to python debug listener: {e}\n')

    def getPropertyDescriptors(self):
        return self.property_descriptors

    def transform(self, context, flowfile) -> FlowFileTransformResult:
        LOG_PREFIX = f'{self.LOG_PREFIX_BASE} FF:{flowfile.getAttribute("filename")} >>'
        self.logger.info(f'\n\n\n{LOG_PREFIX} START Processing ...')

        if flowfile.getSize():
            ff_content = flowfile.getContentsAsBytes().decode()

            try:
                ff_content_dict = json.loads(ff_content)
                self.logger.info(f'{LOG_PREFIX} ff_content: {ff_content}')
                # self.UserId( **ff_content_dict )

                authServerUrl = context.getProperty(self.authServerDomain).getValue()
                authServerTokenUri = context.getProperty(self.authServerTokenUri).getValue()
                projectId = context.getProperty(self.projectId).getValue()

                self.logger.info(
                    f'{LOG_PREFIX} AuthServer={authServerUrl}, TokenEndpoint={authServerTokenUri}, ProjectId={projectId}')

                client_credentials = f"{ff_content_dict['client_id']}:{ff_content['client_secret']}".encode("utf-8")
                return FlowFileTransformResult(relationship='success', contents='{"error": "sin error"}', attributes=dict(_rest_status="500", _op_status="-1"))

            except ValidationError as vex:
                self.logger.error(f'{LOG_PREFIX} End Processing w/ERROR - Not Processable(406) - {str(vex.errors())}')
                return FlowFileTransformResult(relationship='failure',  attributes=dict(_rest_status="406", _op_status="0"))
            except Exception as ex:
                x = FlowFileTransformResult(relationship='failure',  attributes=dict(rest_status='Mil'))
                attrs = x.getAttributes()
                self.logger.info(f'{LOG_PREFIX} Failure with:   Params={str(attrs)}   Relationship={x.getRelationship()}   Contents={x.getContents()}')
                self.logger.error(f'{LOG_PREFIX} End Processing w/ERROR - Not Processable(406) - {str(ex)} \n\n\n')
                return x

        else:
            self.logger.error(f'{LOG_PREFIX} END Processing w/ERROR - Empty data(400)')
            return FlowFileTransformResult(relationship='failure', contents='Bad Request', attributes=dict(_rest_status="400", _op_status="0"))

Reply via email to