Hi folks,

I'm having a hard time to figure out how to do an update inside an atomic
transaction with NiFi.

In MySQL I use a table to control what is kind of a "record counter"
distributed among files. The table looks like this:

*create table file_records (*

* file_id BIGINT NOT NULL AUTO_INCREMENT NOT NULL PRIMARY KEY,*
* file_name VARCHAR(255) NOT NULL,*


* first_id BIGINT NOT NULL DEFAULT 0, last_id BIGINT NOT NULL DEFAULT 0,*
*)*

The idea is that when a file arrives, given that I know how many lines it
has I'll allocate a range of ids (first_id, last_id) equal to the amount of
lines in advance, allowing me to process multiple files in parallel without
the risk of having record ids collisions (the records/line will be inserted
in another database that does not have auto increment fields).

For this to work I planned to use an update query like this in an acid
transaction with full table lock:


*update tbl_name*


*set first_id = (select max(last_id) + 1),last_id = (select max(last_id) +
${record_count})where campaign_id = ${sql.generated.key};*

To test this concept I've tried multiple variations of a PutSQL
configuration processor with 4 concurrent task neither with success (I'll
expect to have records ).

My last attempt was to setup a sequence of PutSQL doing a LOCK TABLE
tbl_name; / UPDATE .... / UNLOCK TABLE but still haven't got it working.
When a processor tries to access the locked table it generates an error
instead of waiting for the lock to be released.

Has anyone ever done something similar in NiFi? What would be the proper
way to perform each update inside an atomic transaction using a full table
lock ?

Thanks in advance?

Reply via email to