Re: [PR] NIFI-15712 Add JSON Lines support to JoltTransformJSON [nifi]
pvillard31 merged PR #11001: URL: https://github.com/apache/nifi/pull/11001 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] NIFI-15712 Add JSON Lines support to JoltTransformJSON [nifi]
exceptionfactory commented on code in PR #11001:
URL: https://github.com/apache/nifi/pull/11001#discussion_r2931346425
##
nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java:
##
@@ -143,104 +149,155 @@ protected List
getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
+@OnScheduled
+@Override
+public void setup(final ProcessContext context) {
+super.setup(context);
+final int maxStringLength =
context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+final StreamReadConstraints streamReadConstraints =
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+
+final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+jsonUtil = JsonUtils.customJsonUtil(objectMapper);
+
+configuredClassLoader = getClass().getClassLoader();
+try {
+final JoltTransformStrategy strategy =
context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
+
+if (strategy == JoltTransformStrategy.CUSTOMR &&
context.getProperty(MODULES).isSet()) {
+configuredClassLoader = ClassLoaderUtils.getCustomClassLoader(
+
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+getClass().getClassLoader(),
+getJarFilenameFilter()
+);
+}
+} catch (final Exception e) {
+getLogger().error("ClassLoader configuration failed", e);
+}
+}
+
@Override
public void onTrigger(final ProcessContext context, ProcessSession
session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
-final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
-final Object inputJson;
-final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE ==
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
-String jsonSourceAttributeName = null;
-
-if (sourceStrategyFlowFile) {
-try (final InputStream in = session.read(original)) {
-inputJson = jsonUtil.jsonToObject(in);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on FlowFile content for {}",
original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-} else {
-jsonSourceAttributeName =
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
-final String jsonSourceAttributeValue =
original.getAttribute(jsonSourceAttributeName);
-if (StringUtils.isBlank(jsonSourceAttributeValue)) {
-logger.error("FlowFile attribute '{}' value is blank",
jsonSourceAttributeName);
-session.transfer(original, REL_FAILURE);
-return;
-} else {
-try {
-inputJson =
jsonUtil.jsonToObject(jsonSourceAttributeValue);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on attribute '{}' of
FlowFile {}", jsonSourceAttributeName, original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-}
-}
-
-final String jsonString;
-final ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
+final JsonSourceStrategy sourceStrategy =
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
try {
+
Thread.currentThread().setContextClassLoader(configuredClassLoader);
+
final JoltTransform transform = getTransform(context, original);
-if (customClassLoader != null) {
-
Thread.currentThread().setContextClassLoader(customClassLoader);
-}
+final FlowFile transformedFlowFile = switch (sourceStrategy) {
+case FLOW_FILE -> transformFlowFile(context, session,
original, transform);
+case ATTRIBUTE -> transformAttribute(context, session,
original, transform);
+case JSON_LINES -> transformNewlineDelimited(session,
original, transform);
+};
-final Object transformedJson = TransformUtils.transform(transform,
inputJson);
-jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ?
jsonUtil.toPrettyJsonString(transformedJson) :
jsonUtil.toJsonString(transformedJson);
+onSuccess(context, session, tran
Re: [PR] NIFI-15712 Add JSON Lines support to JoltTransformJSON [nifi]
pvillard31 commented on code in PR #11001:
URL: https://github.com/apache/nifi/pull/11001#discussion_r2930142873
##
nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java:
##
@@ -143,104 +149,155 @@ protected List
getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
+@OnScheduled
+@Override
+public void setup(final ProcessContext context) {
+super.setup(context);
+final int maxStringLength =
context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+final StreamReadConstraints streamReadConstraints =
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+
+final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+jsonUtil = JsonUtils.customJsonUtil(objectMapper);
+
+configuredClassLoader = getClass().getClassLoader();
+try {
+final JoltTransformStrategy strategy =
context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
+
+if (strategy == JoltTransformStrategy.CUSTOMR &&
context.getProperty(MODULES).isSet()) {
+configuredClassLoader = ClassLoaderUtils.getCustomClassLoader(
+
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+getClass().getClassLoader(),
+getJarFilenameFilter()
+);
+}
+} catch (final Exception e) {
+getLogger().error("ClassLoader configuration failed", e);
+}
+}
+
@Override
public void onTrigger(final ProcessContext context, ProcessSession
session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
-final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
-final Object inputJson;
-final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE ==
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
-String jsonSourceAttributeName = null;
-
-if (sourceStrategyFlowFile) {
-try (final InputStream in = session.read(original)) {
-inputJson = jsonUtil.jsonToObject(in);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on FlowFile content for {}",
original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-} else {
-jsonSourceAttributeName =
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
-final String jsonSourceAttributeValue =
original.getAttribute(jsonSourceAttributeName);
-if (StringUtils.isBlank(jsonSourceAttributeValue)) {
-logger.error("FlowFile attribute '{}' value is blank",
jsonSourceAttributeName);
-session.transfer(original, REL_FAILURE);
-return;
-} else {
-try {
-inputJson =
jsonUtil.jsonToObject(jsonSourceAttributeValue);
-} catch (final Exception e) {
-logger.error("JSON parsing failed on attribute '{}' of
FlowFile {}", jsonSourceAttributeName, original, e);
-session.transfer(original, REL_FAILURE);
-return;
-}
-}
-}
-
-final String jsonString;
-final ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
+final JsonSourceStrategy sourceStrategy =
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
try {
+
Thread.currentThread().setContextClassLoader(configuredClassLoader);
+
final JoltTransform transform = getTransform(context, original);
-if (customClassLoader != null) {
-
Thread.currentThread().setContextClassLoader(customClassLoader);
-}
+final FlowFile transformedFlowFile = switch (sourceStrategy) {
+case FLOW_FILE -> transformFlowFile(context, session,
original, transform);
+case ATTRIBUTE -> transformAttribute(context, session,
original, transform);
+case JSON_LINES -> transformNewlineDelimited(session,
original, transform);
+};
-final Object transformedJson = TransformUtils.transform(transform,
inputJson);
-jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ?
jsonUtil.toPrettyJsonString(transformedJson) :
jsonUtil.toJsonString(transformedJson);
+onSuccess(context, session, transforme
