[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query
tpalfy commented on code in PR #6794: URL: https://github.com/apache/nifi/pull/6794#discussion_r1115841881 ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -387,11 +461,78 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } while (nextRecordsUrl.get() != null); } -private InputStream getResultInputStream(AtomicReference nextRecordsUrl, String querySObject) { -if (nextRecordsUrl.get() == null) { +private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile originalFlowFile) { +String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue(); +AtomicReference nextRecordsUrl = new AtomicReference<>(); +AtomicReference totalSize = new AtomicReference<>(); +boolean isOriginalTransferred = false; +List flowFiles = new ArrayList<>(); +do { +FlowFile outgoingFlowFile; +if (originalFlowFile != null) { +outgoingFlowFile = session.create(originalFlowFile); +} else { +outgoingFlowFile = session.create(); +} +Map attributes = new HashMap<>(); +try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) { +outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize)); +int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000; +attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount)); +session.adjustCounter("Salesforce records processed", recordCount, false); +session.putAllAttributes(outgoingFlowFile, attributes); +flowFiles.add(outgoingFlowFile); +} catch (IOException e) { +throw new ProcessException("Couldn't get Salesforce records", e); +} catch (Exception e) { +if (originalFlowFile != null) { +session.transfer(originalFlowFile, REL_FAILURE); +isOriginalTransferred = true; +} +getLogger().error("Couldn't get Salesforce records", e); +session.remove(flowFiles); +flowFiles.clear(); +break; +} +} while (nextRecordsUrl.get() != null); + +if (!flowFiles.isEmpty()) { +session.transfer(flowFiles, REL_SUCCESS); +} +if (originalFlowFile != null && !isOriginalTransferred) { +session.transfer(originalFlowFile, REL_ORIGINAL); +} Review Comment: An exception can occur between the creation of the `outgoingFlowFile` and it's addition to the `flowFiles` collection. (For example when OAuth fails.) When this happens we don't remove it from the session. Here's a suggested fix (contains some other minor changes): ```suggestion String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue(); AtomicReference nextRecordsUrl = new AtomicReference<>(); AtomicReference totalSize = new AtomicReference<>(); boolean isOriginalTransferred = false; List outgoingFlowFiles = new ArrayList<>(); do { FlowFile outgoingFlowFile; try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) { if (originalFlowFile != null) { outgoingFlowFile = session.create(originalFlowFile); } else { outgoingFlowFile = session.create(); } outgoingFlowFiles.add(outgoingFlowFile); outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize)); int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000; Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount)); session.adjustCounter("Salesforce records processed", recordCount, false); session.putAllAttributes(outgoingFlowFile, attributes); } catch (IOException e) { throw new ProcessException("Couldn't get Salesforce records", e); } catch (Exception e) { if (originalFlowFile != null) { session.transfer(originalFlowFile,
[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query
tpalfy commented on code in PR #6794: URL: https://github.com/apache/nifi/pull/6794#discussion_r1108665694 ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -224,6 +282,8 @@ protected List getSupportedPropertyDescriptors() { return Collections.unmodifiableList(Arrays.asList( API_URL, Review Comment: When switching between PROPERTY_BASED and CUSTOM_QUERY the list of properties on the processor changes in a confusing manner because dependent and non-dependent properties are intermingled. Can we group them a bit better and move the non-dependent properties either before or after the dependent ones? See example below. Also CREATE_ZERO_RECORD_FILES doesn't work with CUSTOM_QUERY. ```java API_URL, API_VERSION, QUERY_TYPE, CUSTOM_SOQL_QUERY, SOBJECT_NAME, FIELD_NAMES, RECORD_WRITER, AGE_FIELD, INITIAL_AGE_FILTER, AGE_DELAY, CUSTOM_WHERE_CONDITION, READ_TIMEOUT, CREATE_ZERO_RECORD_FILES, TOKEN_PROVIDER ``` ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -106,13 +111,55 @@ @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class QuerySalesforceObject extends AbstractProcessor { +static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties."); +static final AllowableValue CUSTOM_QUERY = new AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query."); + +static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() Review Comment: Why can't we use `API_URL` and `API_VERSION` from `CommonSalesforceProperties`? ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -261,6 +321,23 @@ protected Collection customValidate(ValidationContext validati @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue()); +AtomicReference nextRecordsUrl = new AtomicReference<>(); +AtomicReference totalSize = new AtomicReference<>(); Review Comment: These `AtomicReference` objects are not used here. I don't think their creation should be the responsibility of `onTrigger`, rather that of the downstream methods. The `nextRecordsUrl` constructor would be explicitly called both in those methods but I wouldn't consider that a duplication. These being closer to the `do...while` block they are used in would help understand their purpose much better. ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -387,11 +461,63 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } while (nextRecordsUrl.get() != null); } -private InputStream getResultInputStream(AtomicReference nextRecordsUrl, String querySObject) { -if (nextRecordsUrl.get() == null) { +private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile flowFile, AtomicReference nextRecordsUrl, AtomicReference totalSize) { +String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(flowFile).getValue(); +do { +FlowFile outgoingFlowFile; +if (flowFile != null) { +outgoingFlowFile = session.create(flowFile); +} else { +outgoingFlowFile = session.create(); +} +Map attributes = new HashMap<>(); +try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) { +outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize)); +int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000; +attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount)); +session.adjustCounter("Salesforce records processed", recordCount, false); +session.putAllAttributes(outgoingFlowFile, attributes); +
[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query
tpalfy commented on code in PR #6794: URL: https://github.com/apache/nifi/pull/6794#discussion_r1069710817 ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -106,13 +113,54 @@ @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class QuerySalesforceObject extends AbstractProcessor { +static final AllowableValue QUERY_PARAMETERS = new AllowableValue("query-parameters", "Query Parameters", "Provide query by parameters."); Review Comment: To me this name was confusing at first. I think for most of us "query parameter" implies the idea of using parameters in prepared statement or even in an http request. Also it's not really parameters but properties isn't it? We are using the properties of the processor to build the query. With that in mind I'd rename this to PROPERTY_BASED_QUERY or something similar. BTW both query methods look very vulnerable to injection. Are they not? ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -261,6 +323,30 @@ protected Collection customValidate(ValidationContext validati @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue()); Review Comment: This onTrigger is getting a little too big. I would at least move the 2 types of request-result handling logic to their own separate methods. ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -261,6 +323,30 @@ protected Collection customValidate(ValidationContext validati @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue()); +AtomicReference nextRecordsUrl = new AtomicReference<>(); + +if (isCustomQuery) { +String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).getValue(); +do { +FlowFile flowFile = session.create(); +Map attributes = new HashMap<>(); +try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) { +flowFile = session.write(flowFile, parseHttpResponse(response, nextRecordsUrl)); +int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize) % 2000; Review Comment: Is there a guarantee that the number of records will always be 2000 in a batch when there are more than that much in total? In any case I'd make a constant out of it. ## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ## @@ -194,10 +248,16 @@ public class QuerySalesforceObject extends AbstractProcessor { private static final String TIME_FORMAT = "HH:mm:ss.SSSX"; private static final String DATE_TIME_FORMAT = "-MM-dd'T'HH:mm:ss.SSS"; private static final String NEXT_RECORDS_URL = "nextRecordsUrl"; +private static final String TOTAL_SIZE = "totalSize"; +private static final String RECORDS = "records"; private static final BiPredicate CAPTURE_PREDICATE = (fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName); +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); +private static final String TOTAL_RECORD_COUNT = "total.record.count"; private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter; private volatile SalesforceRestService salesforceRestService; +private volatile String totalSize; Review Comment: I don't think this is a good idea. Not only it is not thread safe (although not sure if it makes much sense to run this processor on multiple threads) but a running method to store intermediate state in fields is a bad design in general. The crux of the issue is that the new `parseHttpResponse` method needs to return 3 entirely different things. The output stream callback, this total size and also the next record url. If we really don't want to create a POJO for those 3, we could use an Atomic or other wrapper to have the total size in an in-out parameter (similar to how the next record url is handled already). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use