Austin,

Can you create a (non-)materialized view from that query? If so then
QueryDatabaseTable could work. If not, then try QueryRecord after
ExecuteSQL (after adding s.txn_time, I didn't see it in the query), I
think you can add a "max_txn_time" field to the schema and do
something like SELECT *, MAX(txn_time) AS max_txn_time from FLOWFILE.
You may also need a JSONRecordSetWriter so you can get the value from
the max_txn_time field using EvaluateJSONPath. Then you can use
UpdateAttribute to store the max value, and if you want to remove the
max_txn_time field (and/or convert back to Avro) you can use
ConvertRecord downstream. From UpdateAttribute you can send the flow
file back to ExecuteSQL, assuming the query uses ExpressionLanguage
with the max_txn_time attribute to get the new records. You may also
need some logic to send the same flow file back to ExecuteSQL if no
rows were returned in the resultset (a RouteOnAttribute using
executesql.row.count should work), which should cause a retry until
records are returned.

BTW, there is a PR under review [1] to support arbitrary queries in
QueryDatabaseTable, I'm not sure if that would solve your use case but
you may see that option in NiFi 1.6.0.

Regards,
Matt

[1] https://github.com/apache/nifi/pull/2162


On Tue, Feb 6, 2018 at 9:54 AM, Austin Duncan <[email protected]> wrote:
> select
> a.sendingfacility_namespaceid as "PracticeId",
> Initcap(a.sendingfacility_universalid) as "PracticeName",
> s.aip_personel_resource_id as "PhysicianId",
> Initcap(s.aip_personel_resource_lname) as "PhysicianLastName",
> Initcap(s.aip_personel_resource_fname) as "PhysicianFirstName",
> coalesce(a.alternate_patient_id, s.alternate_patient_id) as "PatientId",
> initcap(coalesce(a.patient_fname, s.patient_fname)) as "FirstName",
> initcap(coalesce(a.patient_lname, s.patient_lname)) as "LastName",
> a.patient_dob::date as "DateOfBirth",
> a.patient_sex as "Gender",
> a.patient_street as "Address1",
> a.patient_other_designation as "Address2",
> initcap(a.patient_city) as "City",
> a.patient_state as "State",
> a.patient_zip as "Zip",
> coalesce(a.patient_ssn, s.patient_ssn) as "SSN",
> a.pri_ins_company_name as "Payer1",
> a.pri_ins_policy_nbr as "PolicyNumber1",
> a.sec_ins_company_name as "Payer2",
> a.sec_ins_policy_nbr as "PolicyNumber2",
> s.schedule_id as "AppointmentId",
> to_timestamp(s.appt_startdate, 'YYYYMMDDHH24MISS') as "AppointmentTime",
> s.event_reason as "AppointmentDescription",
> initcap(s.appt_type) as "AppointmentType",
> s.filler_status as "AppointmentStatus",
> s.event_reason_id as "EncounterID"
>
> from
> hl7.adt a, hl7.siu s
> where
> a.alternate_patient_id = s.alternate_patient_id
> and
> length(trim(s.alternate_patient_id)) > 1
>
> i am also matching on 3 particular doctors but dont want to send that. The
> column that i am trying to match on is s.txn_time. I only want txn_times
> after the highest txn_time from the previous query.
>
> On Tue, Feb 6, 2018 at 9:50 AM, James Wing <[email protected]> wrote:
>>
>> Understood.  Can you share an outline of the query you are trying to use?
>> Which values would you change based on the maintained state?  What would be
>> the source of the updated state?
>>
>> On Tue, Feb 6, 2018 at 6:33 AM, Austin Duncan <[email protected]>
>> wrote:
>>>
>>> I am not sure if thats what i need. My query is pretty robust and i am
>>> not sure if it will be able to be implemented in that
>>>
>>> On Tue, Feb 6, 2018 at 9:30 AM, James Wing <[email protected]> wrote:
>>>>
>>>> Austin,
>>>>
>>>> Have you tried QueryDatabaseTable?  For some databases and table schema,
>>>> it provides a shortcut to querying for the recently changed records, as 
>>>> long
>>>> as you have a "maximum value column" to use.
>>>>
>>>>
>>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.QueryDatabaseTable/index.html
>>>>
>>>> Thanks,
>>>>
>>>> James
>>>>
>>>> On Tue, Feb 6, 2018 at 6:12 AM, Austin Duncan <[email protected]>
>>>> wrote:
>>>>>
>>>>> All,
>>>>> I am trying to do a flow that queries a postgres database every hour. I
>>>>> am trying to use the stateful settings in the updateAttribute processor so
>>>>> that I only pull new files that have been uploaded within that hour. I am
>>>>> having trouble figuring out how to implement it. Can anyone help me out?
>>>>> Thanks,
>>>>> Austin
>>>>>
>>>>> --
>>>>> Austin Duncan
>>>>> Developer
>>>>> PYA Analytics
>>>>> 2220 Sutherland Avenue
>>>>> Knoxville, TN 37919
>>>>> 423-260-4172
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Austin Duncan
>>> Developer
>>> PYA Analytics
>>> 2220 Sutherland Avenue
>>> Knoxville, TN 37919
>>> 423-260-4172
>>
>>
>
>
>
> --
> Austin Duncan
> Developer
> PYA Analytics
> 2220 Sutherland Avenue
> Knoxville, TN 37919
> 423-260-4172

Reply via email to