Re: [PR] NIFI-15712 Add JSON Lines support to JoltTransformJSON [nifi]

2026-03-13 Thread via GitHub


pvillard31 merged PR #11001:
URL: https://github.com/apache/nifi/pull/11001


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15712 Add JSON Lines support to JoltTransformJSON [nifi]

2026-03-13 Thread via GitHub


exceptionfactory commented on code in PR #11001:
URL: https://github.com/apache/nifi/pull/11001#discussion_r2931346425


##
nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java:
##
@@ -143,104 +149,155 @@ protected List 
getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
+@OnScheduled
+@Override
+public void setup(final ProcessContext context) {
+super.setup(context);
+final int maxStringLength = 
context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+final StreamReadConstraints streamReadConstraints = 
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+
+final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+jsonUtil = JsonUtils.customJsonUtil(objectMapper);
+
+configuredClassLoader = getClass().getClassLoader();
+try {
+final JoltTransformStrategy strategy = 
context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
+
+if (strategy == JoltTransformStrategy.CUSTOMR && 
context.getProperty(MODULES).isSet()) {
+configuredClassLoader = ClassLoaderUtils.getCustomClassLoader(
+
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+getClass().getClassLoader(),
+getJarFilenameFilter()
+);
+}
+} catch (final Exception e) {
+getLogger().error("ClassLoader configuration failed", e);
+}
+}
+
 @Override
 public void onTrigger(final ProcessContext context, ProcessSession 
session) throws ProcessException {
 final FlowFile original = session.get();
 if (original == null) {
 return;
 }
 
-final ComponentLog logger = getLogger();
 final StopWatch stopWatch = new StopWatch(true);
-final Object inputJson;
-final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE == 
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
-String jsonSourceAttributeName = null;
-
-if (sourceStrategyFlowFile) {
-try (final InputStream in = session.read(original)) {
-inputJson = jsonUtil.jsonToObject(in);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on FlowFile content for {}", 
original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-} else {
-jsonSourceAttributeName = 
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
-final String jsonSourceAttributeValue = 
original.getAttribute(jsonSourceAttributeName);
-if (StringUtils.isBlank(jsonSourceAttributeValue)) {
-logger.error("FlowFile attribute '{}' value is blank", 
jsonSourceAttributeName);
-session.transfer(original, REL_FAILURE);
-return;
-} else {
-try {
-inputJson = 
jsonUtil.jsonToObject(jsonSourceAttributeValue);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on attribute '{}' of 
FlowFile {}", jsonSourceAttributeName, original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-}
-}
-
-final String jsonString;
-final ClassLoader originalContextClassLoader = 
Thread.currentThread().getContextClassLoader();
+final JsonSourceStrategy sourceStrategy = 
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
 try {
+
Thread.currentThread().setContextClassLoader(configuredClassLoader);
+
 final JoltTransform transform = getTransform(context, original);
-if (customClassLoader != null) {
-
Thread.currentThread().setContextClassLoader(customClassLoader);
-}
+final FlowFile transformedFlowFile = switch (sourceStrategy) {
+case FLOW_FILE -> transformFlowFile(context, session, 
original, transform);
+case ATTRIBUTE -> transformAttribute(context, session, 
original, transform);
+case JSON_LINES -> transformNewlineDelimited(session, 
original, transform);
+};
 
-final Object transformedJson = TransformUtils.transform(transform, 
inputJson);
-jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? 
jsonUtil.toPrettyJsonString(transformedJson) : 
jsonUtil.toJsonString(transformedJson);
+onSuccess(context, session, tran

Re: [PR] NIFI-15712 Add JSON Lines support to JoltTransformJSON [nifi]

2026-03-13 Thread via GitHub


pvillard31 commented on code in PR #11001:
URL: https://github.com/apache/nifi/pull/11001#discussion_r2930142873


##
nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java:
##
@@ -143,104 +149,155 @@ protected List 
getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
+@OnScheduled
+@Override
+public void setup(final ProcessContext context) {
+super.setup(context);
+final int maxStringLength = 
context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+final StreamReadConstraints streamReadConstraints = 
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+
+final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+jsonUtil = JsonUtils.customJsonUtil(objectMapper);
+
+configuredClassLoader = getClass().getClassLoader();
+try {
+final JoltTransformStrategy strategy = 
context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
+
+if (strategy == JoltTransformStrategy.CUSTOMR && 
context.getProperty(MODULES).isSet()) {
+configuredClassLoader = ClassLoaderUtils.getCustomClassLoader(
+
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+getClass().getClassLoader(),
+getJarFilenameFilter()
+);
+}
+} catch (final Exception e) {
+getLogger().error("ClassLoader configuration failed", e);
+}
+}
+
 @Override
 public void onTrigger(final ProcessContext context, ProcessSession 
session) throws ProcessException {
 final FlowFile original = session.get();
 if (original == null) {
 return;
 }
 
-final ComponentLog logger = getLogger();
 final StopWatch stopWatch = new StopWatch(true);
-final Object inputJson;
-final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE == 
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
-String jsonSourceAttributeName = null;
-
-if (sourceStrategyFlowFile) {
-try (final InputStream in = session.read(original)) {
-inputJson = jsonUtil.jsonToObject(in);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on FlowFile content for {}", 
original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-} else {
-jsonSourceAttributeName = 
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
-final String jsonSourceAttributeValue = 
original.getAttribute(jsonSourceAttributeName);
-if (StringUtils.isBlank(jsonSourceAttributeValue)) {
-logger.error("FlowFile attribute '{}' value is blank", 
jsonSourceAttributeName);
-session.transfer(original, REL_FAILURE);
-return;
-} else {
-try {
-inputJson = 
jsonUtil.jsonToObject(jsonSourceAttributeValue);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on attribute '{}' of 
FlowFile {}", jsonSourceAttributeName, original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-}
-}
-
-final String jsonString;
-final ClassLoader originalContextClassLoader = 
Thread.currentThread().getContextClassLoader();
+final JsonSourceStrategy sourceStrategy = 
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
 try {
+
Thread.currentThread().setContextClassLoader(configuredClassLoader);
+
 final JoltTransform transform = getTransform(context, original);
-if (customClassLoader != null) {
-
Thread.currentThread().setContextClassLoader(customClassLoader);
-}
+final FlowFile transformedFlowFile = switch (sourceStrategy) {
+case FLOW_FILE -> transformFlowFile(context, session, 
original, transform);
+case ATTRIBUTE -> transformAttribute(context, session, 
original, transform);
+case JSON_LINES -> transformNewlineDelimited(session, 
original, transform);
+};
 
-final Object transformedJson = TransformUtils.transform(transform, 
inputJson);
-jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? 
jsonUtil.toPrettyJsonString(transformedJson) : 
jsonUtil.toJsonString(transformedJson);
+onSuccess(context, session, transforme