[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

2023-02-23 Thread via GitHub


tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1115841881


##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -387,11 +461,78 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 } while (nextRecordsUrl.get() != null);
 }
 
-private InputStream getResultInputStream(AtomicReference 
nextRecordsUrl, String querySObject) {
-if (nextRecordsUrl.get() == null) {
+private void processCustomQuery(ProcessContext context, ProcessSession 
session, FlowFile originalFlowFile) {
+String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
+AtomicReference nextRecordsUrl = new AtomicReference<>();
+AtomicReference totalSize = new AtomicReference<>();
+boolean isOriginalTransferred = false;
+List flowFiles = new ArrayList<>();
+do {
+FlowFile outgoingFlowFile;
+if (originalFlowFile != null) {
+outgoingFlowFile = session.create(originalFlowFile);
+} else {
+outgoingFlowFile = session.create();
+}
+Map attributes = new HashMap<>();
+try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
+int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
+attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
+session.adjustCounter("Salesforce records processed", 
recordCount, false);
+session.putAllAttributes(outgoingFlowFile, attributes);
+flowFiles.add(outgoingFlowFile);
+} catch (IOException e) {
+throw new ProcessException("Couldn't get Salesforce records", 
e);
+} catch (Exception e) {
+if (originalFlowFile != null) {
+session.transfer(originalFlowFile, REL_FAILURE);
+isOriginalTransferred = true;
+}
+getLogger().error("Couldn't get Salesforce records", e);
+session.remove(flowFiles);
+flowFiles.clear();
+break;
+}
+} while (nextRecordsUrl.get() != null);
+
+if (!flowFiles.isEmpty()) {
+session.transfer(flowFiles, REL_SUCCESS);
+}
+if (originalFlowFile != null && !isOriginalTransferred) {
+session.transfer(originalFlowFile, REL_ORIGINAL);
+}

Review Comment:
   An exception can occur between the creation of the `outgoingFlowFile` and 
it's addition to the `flowFiles` collection. (For example when OAuth fails.) 
When this happens we don't remove it from the session.
   Here's a suggested fix (contains some other minor changes): 
   ```suggestion
   String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
   AtomicReference nextRecordsUrl = new AtomicReference<>();
   AtomicReference totalSize = new AtomicReference<>();
   
   boolean isOriginalTransferred = false;
   
   List outgoingFlowFiles = new ArrayList<>();
   do {
   FlowFile outgoingFlowFile;
   try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
   if (originalFlowFile != null) {
   outgoingFlowFile = session.create(originalFlowFile);
   } else {
   outgoingFlowFile = session.create();
   }
   outgoingFlowFiles.add(outgoingFlowFile);
   
   outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
   
   int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
   
   Map attributes = new HashMap<>();
   attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
   attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
   
   session.adjustCounter("Salesforce records processed", 
recordCount, false);
   
   session.putAllAttributes(outgoingFlowFile, attributes);
   } catch (IOException e) {
   throw new ProcessException("Couldn't get Salesforce 
records", e);
   } catch (Exception e) {
   if (originalFlowFile != null) {
   session.transfer(originalFlowFile, 

[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

2023-02-16 Thread via GitHub


tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1108665694


##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -224,6 +282,8 @@ protected List 
getSupportedPropertyDescriptors() {
 return Collections.unmodifiableList(Arrays.asList(
 API_URL,

Review Comment:
   When switching between PROPERTY_BASED and CUSTOM_QUERY the list of 
properties on the processor changes in a confusing manner because dependent and 
non-dependent properties are intermingled.
   
   Can we group them a bit better and move the non-dependent properties either 
before or after the dependent ones?
   See example below.
   
   Also CREATE_ZERO_RECORD_FILES doesn't work with CUSTOM_QUERY.
   
   ```java
   API_URL,
   API_VERSION,
   QUERY_TYPE,
   CUSTOM_SOQL_QUERY,
   SOBJECT_NAME,
   FIELD_NAMES,
   RECORD_WRITER,
   AGE_FIELD,
   INITIAL_AGE_FILTER,
   AGE_DELAY,
   CUSTOM_WHERE_CONDITION,
   READ_TIMEOUT,
   CREATE_ZERO_RECORD_FILES,
   TOKEN_PROVIDER
   ```



##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -106,13 +111,55 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+static final AllowableValue PROPERTY_BASED_QUERY = new 
AllowableValue("property-based-query", "Property Based Query", "Provide query 
by properties.");
+static final AllowableValue CUSTOM_QUERY = new 
AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query.");
+
+static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()

Review Comment:
   Why can't we use `API_URL` and `API_VERSION` from 
`CommonSalesforceProperties`?



##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -261,6 +321,23 @@ protected Collection 
customValidate(ValidationContext validati
 
 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+AtomicReference nextRecordsUrl = new AtomicReference<>();
+AtomicReference totalSize = new AtomicReference<>();

Review Comment:
   These `AtomicReference` objects are not used here. I don't think their 
creation should be the responsibility of `onTrigger`, rather that of the 
downstream methods.
   
   The `nextRecordsUrl` constructor would be explicitly called both in those 
methods but I wouldn't consider that a duplication. These being closer to the 
`do...while` block they are used in would help understand their purpose much 
better.



##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -387,11 +461,63 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 } while (nextRecordsUrl.get() != null);
 }
 
-private InputStream getResultInputStream(AtomicReference 
nextRecordsUrl, String querySObject) {
-if (nextRecordsUrl.get() == null) {
+private void processCustomQuery(ProcessContext context, ProcessSession 
session, FlowFile flowFile, AtomicReference nextRecordsUrl, 
AtomicReference totalSize) {
+String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(flowFile).getValue();
+do {
+FlowFile outgoingFlowFile;
+if (flowFile != null) {
+outgoingFlowFile = session.create(flowFile);
+} else {
+outgoingFlowFile = session.create();
+}
+Map attributes = new HashMap<>();
+try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
+int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
+attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
+session.adjustCounter("Salesforce records processed", 
recordCount, false);
+session.putAllAttributes(outgoingFlowFile, attributes);
+

[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

2023-01-13 Thread GitBox


tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1069710817


##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -106,13 +113,54 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+static final AllowableValue QUERY_PARAMETERS = new 
AllowableValue("query-parameters", "Query Parameters", "Provide query by 
parameters.");

Review Comment:
   To me this name was confusing at first. I think for most of us "query 
parameter" implies the idea of using parameters in prepared statement or even 
in an http request.
   
   Also it's not really parameters but properties isn't it? We are using the 
properties of the processor to build the query.
   
   With that in mind I'd rename this to PROPERTY_BASED_QUERY or something 
similar.
   
   BTW both query methods look very vulnerable to injection. Are they not?



##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -261,6 +323,30 @@ protected Collection 
customValidate(ValidationContext validati
 
 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());

Review Comment:
   This onTrigger is getting a little too big.
   
   I would at least move the 2 types of request-result handling logic to their 
own separate methods.



##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -261,6 +323,30 @@ protected Collection 
customValidate(ValidationContext validati
 
 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+AtomicReference nextRecordsUrl = new AtomicReference<>();
+
+if (isCustomQuery) {
+String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).getValue();
+do {
+FlowFile flowFile = session.create();
+Map attributes = new HashMap<>();
+try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+flowFile = session.write(flowFile, 
parseHttpResponse(response, nextRecordsUrl));
+int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize) % 2000;

Review Comment:
   Is there a guarantee that the number of records will always be 2000 in a 
batch when there are more than that much in total?
   
   In any case I'd make a constant out of it.



##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -194,10 +248,16 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
 private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
 private static final String DATE_TIME_FORMAT = 
"-MM-dd'T'HH:mm:ss.SSS";
 private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+private static final String TOTAL_SIZE = "totalSize";
+private static final String RECORDS = "records";
 private static final BiPredicate CAPTURE_PREDICATE = 
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+private static final String TOTAL_RECORD_COUNT = "total.record.count";
 
 private volatile SalesforceToRecordSchemaConverter 
salesForceToRecordSchemaConverter;
 private volatile SalesforceRestService salesforceRestService;
+private volatile String totalSize;

Review Comment:
   I don't think this is a good idea. Not only it is not thread safe (although 
not sure if it makes much sense to run this processor on multiple threads) but 
a running method to store intermediate state in fields is a bad design in 
general.
   
   The crux of the issue is that the new `parseHttpResponse` method needs to 
return 3 entirely different things. The output stream callback, this total size 
and also the next record url.
   If we really don't want to create a POJO for those 3, we could use an Atomic 
or other wrapper to have the total size in an in-out parameter (similar to how 
the next record url is handled already).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use