Re: Wait processor doesn't route expired flowfiles to 'expired' relationship

2020-12-20 Thread Koji Kawamura
Hi Luca,

Sorry to hear that you are having an issue with Wait processor.

By looking at the code and testing it locally, I couldn't find a cause
of the issue nor reproduce it.
However, theoretically such a situation can happen when there are too
many queued FlowFiles in the connection in front of the Wait processor
exceeding the processor's throughput. Especially where the Wait
processor is busy processing the recently added FlowFiles and not
being able to take care of older FlowFiles.
By default, the order of processing queued FlowFiles is undefined.
To eliminate one of the possible uncertainties, I'd recommend using
OldestFlowFileFirstPrioritizer at the incoming connection, so that
older FlowFiles can be processed first.

Hope this helps,
Koji

On Thu, Dec 17, 2020 at 1:36 AM Luca Giovannini
 wrote:
>
>
>
> Hi All,
>
>
>
> I am using the Wait/Notify couple of processors, with a setting of the Wait 
> processor as follows:
>
> Wait mode = keep in the upstream connection
> Expiration duration = 10 min
> Wait penalty duration = 30 sec
>
>
>
> I use the Wait/Notify in a split/merge situation where al the split parts 
> have to be processed before the Wait is released.
>
> Occasionally, one of the branches of the split fails or takes too much time 
> and that is taken care with an “expired” branch of the Wait processor.
>
>
>
> The flow runs well for days but then slowly the incoming connection to the 
> Wait processor starts to grow, and checking the “queued duration” and 
> “lineage duration” of the flowfiles in the queue list I see that they are 
> much older than the “Expiration duration” setting (days vs. 10 minutes). I 
> have also noticed that, when this happens, if I change the “expiration 
> duration” setting to a smaller value the Wait processor starts to clean up 
> the queue by routing the flowfiles to expired. I have seen this problem 
> happening with all sorts of different values for “Expiration duration” and 
> “Wait penalty”, the ones that I provided above are just an example.
>
> I am using NiFi 1.11.4.
>
>
>
> What can be happening?
>
> This unpredictable behaviour is blocking the whole flow and making the Wait 
> processor unusable for me ☹
>
>
>
> Thank you very much for your help and support!
>
>
>
> Luca
>
>
>
> Le informazioni contenute in questo messaggio di posta elettronica sono 
> riservate e confidenziali e ne e' vietata la diffusione in qualsiasi modo o 
> forma. Qualora Lei non fosse la persona destinataria del presente messaggio, 
> La invitiamo a non diffonderlo e ad eliminarlo, dandone gentilmente 
> comunicazione al mittente.
>
> The information included in this e-mail and any attachments are confidential 
> and may also be privileged. If you are not the correct recipient, you are 
> kindly requested to notify the sender immediately, to cancel it and not to 
> disclose the contents to any other person.


Re: Supporting Elasticsearch scrolling with an input flow file

2019-11-12 Thread Koji Kawamura
Hi Tim,

Sorry for the late reply.
It seems the ScrollElasticsearchHttp processor is designed to run a
one-shot query to import query results from Elasticsearch.
The description says "The state must be cleared before another query
can be run."
It tracks progress using managed state, not via incoming FlowFiles.
This processor is a source-processor similar to processors such as ListFile.

If large amount of documents need to be ingested with pagination by a
query based on the incoming FlowFile attribute, then I'd enhance
QueryElasticsearchHttp processor, so that it can route original
incoming FlowFile to a new relationship such as 'next page' while
incrementing the page number attribute, so that next time the FlowFile
is passed to the same QueryElasticsearchHttp again, the query results
in the next page will be used to populate FlowFiles into 'success'
relationship.
QueryElasticsearchHttp processor currently simply removes incoming FlowFiles.

Thanks,
Koji

On Fri, Nov 1, 2019 at 5:05 AM Tim Dean  wrote:
>
> Hello,
>
> I would like to use the existing ScrollElasticsearchHttp to perform a search 
> that returns a potentially large number of hits. The parameters of the search 
> need to reference one or more flow file attributes.
>
> Looking at the source code for this processor it appears that the QUERY 
> property supports EL with flow file attributes. Furthermore, the 
> documentation for the FAILURE relationship notes that only incoming flow 
> files will be routed to failure. So it seems clear that this processor was 
> designed to allow input flow files. Unfortunately though, the processor also 
> has been annotated with INPUT_FORBIDDEN so I can’t use as is.
>
> I assume that there is a good reason for forbidding input here. Before I go 
> and try to implement a custom processor that does what I want, I’d like to 
> know if some hidden problem awaits me.
>
> Can someone clarify why this processor forbids input, and what problems I 
> might expect if I try to circumvent this limitation?
>
> Thanks
>
> - Tim
>
> Sent from my iPhone


Re: Re: MergeRecord can not guarantee the ordering of the input sequence?

2019-10-20 Thread Koji Kawamura
Hi Lei,

Does 'balance strategy' means load balance strategy? Which strategy
are you using? I thought Prioritizers are applied on the destination
node after load balancing has transferred FlowFiles. Are those A, B
and C flow files generated on different nodes and sent to a single
node to merge them?

Thanks,
Koji

On Fri, Oct 18, 2019 at 7:12 PM wangl...@geekplus.com.cn
 wrote:
>
>
> Seems it is because of the balance strategy that is used.
> The balance will not guarantee the the order.
>
> Thanks,
> Lei
>
> 
> wangl...@geekplus.com.cn
>
>
> From: wangl...@geekplus.com.cn
> Date: 2019-10-16 10:21
> To: dev; users
> CC: dev
> Subject: Re: Re: MergeRecord can not guarantee the ordering of the input 
> sequence?
> Hi Koji,
> Actually i have set all connections to FIFO and concurrency tasks to 1 for 
> all processors.
> Before and after the MergeRecord, I add a LogAttribute to debug.
>
> Before MergeRecord,the order in logfile is A,B,C in three flowfile
> After  MergeRecord, the order becomes {A,C,B} in one flowfile
> This is nondeterministic.
>
> I think I should look up the MergeRecord code and do further debug.
>
> Thanks,
> Lei
>
>
>
>
> wangl...@geekplus.com.cn
> From: Koji Kawamura
> Date: 2019-10-16 09:46
> To: users
> CC: dev
> Subject: Re: MergeRecord can not guarantee the ordering of the input sequence?
> Hi Lei,
> How about setting FIFO prioritizer at all the preceding connections
> before the MergeRecord?
> Without setting any prioritizer, FlowFile ordering is nondeterministic.
> Thanks,
> Koji
> On Tue, Oct 15, 2019 at 8:56 PM wangl...@geekplus.com.cn
>  wrote:
> >
> >
> > If  FlowFile A, B, C enter the MergeRecord sequentially, the output should 
> > be one FlowFile {A, B, C}
> > However, when testing with  large data volume, sometimes the output order 
> > will be not the same as they enter. And this result is nondeterministic
> >
> > This really confuses me a lot.
> > Anybody has any insight on this?
> >
> > Thanks,
> > Lei
> >
> > 
> > wangl...@geekplus.com.cn


Re: MergeRecord can not guarantee the ordering of the input sequence?

2019-10-15 Thread Koji Kawamura
Hi Lei,

How about setting FIFO prioritizer at all the preceding connections
before the MergeRecord?
Without setting any prioritizer, FlowFile ordering is nondeterministic.

Thanks,
Koji

On Tue, Oct 15, 2019 at 8:56 PM wangl...@geekplus.com.cn
 wrote:
>
>
> If  FlowFile A, B, C enter the MergeRecord sequentially, the output should be 
> one FlowFile {A, B, C}
> However, when testing with  large data volume, sometimes the output order 
> will be not the same as they enter. And this result is nondeterministic
>
> This really confuses me a lot.
> Anybody has any insight on this?
>
> Thanks,
> Lei
>
> 
> wangl...@geekplus.com.cn


Re: Data inconsistency happens when using CDC to replicate my database

2019-10-15 Thread Koji Kawamura
Hi Lei,

To address FlowFile ordering issue related to CaptureChangeMySQL, I'd
recommend using EnforceOrder processor and FIFO prioritizer before a
processor that requires precise ordering. EnforceOrder can use
"cdc.sequence.id" attribute.

Thanks,
Koji

On Tue, Oct 15, 2019 at 1:14 PM wangl...@geekplus.com.cn
 wrote:
>
>
> Seems it is related with which prioritizer is used.
> The inconsistency accurs when OldestFlowFileFirst prioritizer is used, but 
> not accur when FirstInFristOut prioritizer is used.
> But I have no idea why.
> Any insight on this?
>
> Thanks,
> Lei
>
>
> 
> wangl...@geekplus.com.cn
>
>
> 发件人: wangl...@geekplus.com.cn
> 发送时间: 2019-10-15 08:08
> 收件人: users
> 抄送: dev
> 主题: Data inconsistency happens when using CDC to replicate my database
> Using CaptureChangeMySQL to extract binlog, do some translation and then put 
> to another database with PutDatabaseRecord processor.
> But there's always data inconsitency between destination database and souce 
> database. To debug this, I have do the following settings.
>
> CaptureChangeMySQL only output one table. There's a field called order_no 
> that is uniq in the table.
> All the proessors are scheduled with only one concurrency.
> No data balance between nodes. All run on primary node
> After CaptureChangeMySQL, add a LogAttrubute processor called log1. Before 
> PutDatabaseRecord, also add a LogAttrubute, called log2.
>
> For the inconsistent data, i can  grep the order_no in log1 and log2.
> For one specified order_no, there's total 5  binlog message. But in log1, 
> there's only one message. In log2, there's 5, but the order is changed.
>
> position   type
> 201721167  insert (appeared in log1 and log2)
> 201926490  update(appeared only in log2)
> 202728760  update(appeared only in log2)
> 203162806  update(appeared only in log2)
> 203135127  update (appeared only in log2, the position number is smaller then 
> privious msg)
>
> This really confused me a lot.
> Any insight on this?  Thanks very much.
>
> Lei
>
> 
> wangl...@geekplus.com.cn


Re: Apache NIFI report:Caused by: java.lang.OutOfMemoryError: Compressed class space

2019-10-10 Thread Koji Kawamura
Hello,

Did you check nifi-bootstrap.log? Since the output is logged to stdout,
such information is logged to nifi-bootstrap.log instead of nifi-app.log.

Thanks,
Koji

On Thu, Oct 10, 2019 at 8:14 PM abellnotring  wrote:

> Hi,all
> I’m running two nodes NIFI cluster. some day, it reported an OOM
> problem with this message:
> Caused by: java.lang.OutOfMemoryError: Compressed class space.
> I added "java.arg.18=-XX:+TraceClassUnloading
>
>java.arg.19=-XX:+TraceClassLoading”
>
> to bootstrap.conf to trace the class loading and unloading information,but
> I couldn’t find the output logs about class loading.
>
>So does anybody know how to collect the class loading and unloading
> logs?
>
>
>
>
> abellnotring
> abellnotr...@sina.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


Re: Can CaptureChangeMySQL be scheduled to all nodes instead of primary node?

2019-10-10 Thread Koji Kawamura
Hi Lei,

I don't know any NiFi built-in feature to achieve that.
To distribute CaptureChangeMySQL load among nodes, I'd deploy separate
standalone NiFi (or even MiNiFi Java) in addition to the main NiFi
cluster for the main data flow.

For example, if there are 5 databases and 3 NiFi nodes, deploy a 3
node NiFi cluster with an InputPort.
And also, run standalone NiFi/MiNiFi processes on each node, too,
node-a (datasource 1 and 2), node-b (datasource 3 and 4), node-c
(datasource 5) then use RemoteProcessGroup to send captured data to
the main NiFi cluster.

This approach may be harder to maintain, but feasible.

Thanks,
Koji

On Wed, Oct 9, 2019 at 3:06 PM wangl...@geekplus.com.cn
 wrote:
>
> I am using CaptureChangeMySQL to replicate the database.
> There are many data sources and so there're many  CaptureChangeMySQL 
> processors.
> The CaptureChangeMySQL throws same slave id error  if scheduled on all nodes. 
> So it can only be scheduled on primary node. This causes  very heavy load on 
> the primary node.
>
> Is there any method than i can  distribute the CaptureChangeMySQL processors 
> to all nodes instead of only to primary node?
>
> Thanks,
> Lei
>
> 
> wangl...@geekplus.com.cn


Re: NIFI - Fetchfile - Execute SQL - Put Database Record

2019-10-10 Thread Koji Kawamura
Hi Asmath,

How about using PutSQL?
FetchFile -> PutSQL -> PutDatabaseRecord
You can specify a SQL statement at PutSQL 'SQL Statement' property,
using FlowFile attribute.
For example, delete from tbl where file_name = '${filename}'
This way, the FlowFile content can be passed to PutDatabaseRecord as is.

Thanks,
Koji

On Mon, Oct 7, 2019 at 9:44 AM KhajaAsmath Mohammed
 wrote:
>
> HI,
>
> I have requirement of read a file from fileserver and delete all records from 
> Database if filename is present in database. Next step is to load data from 
> this file.
>
> This is more like delete/insert. I cannot do upserts because the new file can 
> have more/less records after correction from the source.
>
> Does anyone have idea to this?
>
> I tried Fetchfile ->ExecuteSQL- PutDatabse.
> I will loose the file in this case after executeSQL.
>
> Have idea of placing funnel in between and do fork but delete might take time 
> before file starts processing.
>
> Thanks,
> Asmath
>


Re: How keep the from losing original content when a replacetext is performed for an invokehttp

2019-08-27 Thread Koji Kawamura
Hi William,

Wait/Notify may be a possible approach.

- UpdateAttribute
--> ReplaceText --> InvokeHttp --> Notify
--> Wait --> PutFile

- Use UpdateAttribute to add an attribute named 'wait.id' via
expression '${UUID()}'
- Connect the 'success' relationship from UpdateAttribute to both
ReplaceText and Wait processor, this clones the FlowFile, each
FlowFile has the same 'wait.id' attribute
- When InvokeHttp succeeds, Notify will release Wait processor
- If InvokeHttp fails, Wait will timeout

Hope this helps,
Koji

On Tue, Aug 27, 2019 at 6:56 AM William Gosse
 wrote:
>
> I have a situation where I need to do a post using an invokehttp for 
> performing a lock operation on another web server. This requires me to do a 
> replacetext in order to add the required json needed for the post. I wanted 
> to do this call before I performed a putfile call on my system but that blows 
> away the file content I needed to put. I really want to do these to calls in 
> this order so if the invokehttp to get the lock fails I won’t do my put. I 
> did try using the a mergecontent before the put in order the merge the 
> original file content with the invokehttp calls content and that seem to work 
> with some other gymnastics. I am looking for a better if one exists.


Re: Using ConvertRecord on compressed input

2019-07-30 Thread Koji Kawamura
Thank you for filling the jira!
I would prioritize it as a major improvement, instead of minor.

Looking forward to reviewing it.

Thanks,
Koji

On Mon, Jul 29, 2019, 16:37 Malthe  wrote:

> I add this as a new JIRA-issue:
> https://issues.apache.org/jira/browse/NIFI-6496.
>
> I'll try to write a patch for it. I agree with your suggestion to add
> the support to specific record readers (e.g. `CSVReader`).
>
> On Mon, 29 Jul 2019 at 01:18, Koji Kawamura 
> wrote:
> >
> > Hello,
> >
> > Thanks for your question. I've posted my comment to the StackOverflow
> question.
> >
> > I'd avoid adding it to the core package as some of Record formats
> > handles compressed inputs by themselves, like Avro.
> >
> http://apache-avro.679487.n3.nabble.com/read-a-compressed-avro-file-td3872899.html
> > Adding compressed input support at RecordReaderFactory is my suggestion.
> >
> > Thanks,
> > Koji
> >
> > On Sun, Jul 28, 2019 at 6:04 PM Malthe  wrote:
> > >
> > > CSV, JSON, and XML files all compress well and thus you often see them
> > > stored in a compressed format. Actually, it's not feasible or simply
> > > practical to first decompress the files and then process them into a
> > > binary format.
> > >
> > > How would I go about extending or modifying the
> > > `AbstractRecordProcessor` to support a compressed input?
> > >
> > > Note that I have asked this question on StackOverflow [1] (currently
> > > without an answer).
> > >
> > > I'm thinking that this could even be something that might be included
> > > in the core package, either as an option or simply using a sniffing
> > > mechanism.
> > >
> > > [1]
> https://stackoverflow.com/questions/57005564/using-convertrecord-on-compressed-input
>


Re: Bug/Issue with ReplaceTextWithMapping

2019-07-30 Thread Koji Kawamura
The tryLock method does not block if a lock is already acquired by other
thead.
https://github.com/apache/nifi/blob/f8e93186f53917b1fddbc2ae3de26b65a99b9246/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java#L239

On Mon, Jul 29, 2019, 23:24 Ameer Mawia  wrote:

> Adding reference link
> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java>(to
> the code).
>
> On Mon, Jul 29, 2019 at 10:21 AM Ameer Mawia 
> wrote:
>
>> Thanks for reply.
>>
>> Hmm, that should explain the behavior we noted.
>>
>> But I see(here
>> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java>)
>> an instance level lock which is protecting the update Mapping method. 
>> *Shouldn't
>> that eventually block other threads from accessing the old mapping?*
>>
>> Or may that this locking was added later -  version 1.9 or something? We
>> are using 1.8.
>>
>> Thanks,
>> Ameer Mawia
>>
>> On Thu, Jul 25, 2019 at 3:51 AM Koji Kawamura 
>> wrote:
>>
>>> Hi Ameer,
>>>
>>> Is the ReplaceTextWithMapping's 'Concurrent Tasks' set to grater than 1?
>>> Since ReplaceTextWithMapping only reload at a single thread, other
>>> threads may use old mapping until the loading thread complete
>>> refreshing mapping definition.
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Wed, Jul 24, 2019 at 4:28 AM Ameer Mawia 
>>> wrote:
>>> >
>>> > Inline.
>>> >
>>> > On Mon, Jul 22, 2019 at 2:17 AM Koji Kawamura 
>>> wrote:
>>> >>
>>> >> Hi Ameer,
>>> >>
>>> >> How is ReplaceTextWithMapping 'Mapping File Refresh Interval'
>>> configured?
>>> >
>>> > [Ameer] It is configured to 1sec - the lowest value allowed.
>>> >>
>>> >> By default, it's set to '60s'. So,
>>> >> 1. If ReplaceTextWithMapping ran with the old mapping file
>>> >
>>> > [Ameer] First Processing took place on Day-1. A new Mapping was
>>> dropped on Day-1, after Day-1 Processing was over.
>>> >>
>>> >> 2. and the mapping file was updated for the next processing
>>> >
>>> > [Ameer] Second Processing took place on Day-2.
>>> > [Ameer] Here assumption was CACHE will be refreshed from the new
>>> mapping file dropped a day earlier. But ti diddnt happend. Cache got
>>> refreshed in the middle of the flow - not at the very beginnning. Thus few
>>> flowfile got old value and later flowfile got new value.
>>> >>
>>> >> 3. then the flow started processing another CSV file right away line
>>> by line
>>> >>
>>> >> In above scenario, some lines in the CSV might get processed with the
>>> >> old mapping file. After 60s passed from 1, some other lines may get
>>> >> processed with the new mappings. Is that what you're seeing?
>>> >>
>>> > [Ameer] This is what is happening. But it shouldn't have - becuase new
>>> mapping file was already existing before the next processing begin. It
>>> should have refresh right at the start - as also suggested by the code of
>>> the ReplaceTextWithMapping processor.
>>> >>
>>> >> BTW, please avoid posting the same question to users and dev at the
>>> >> same time. I've removed dev address.
>>> >> [Ameer] Got it.
>>> >> Thanks,
>>> >> Koji
>>> >>
>>> >> On Sat, Jul 20, 2019 at 3:08 AM Ameer Mawia 
>>> wrote:
>>> >> >
>>> >> > Correcting Typo.
>>> >> >
>>> >> > On Fri, Jul 19, 2019 at 2:03 PM Ameer Mawia 
>>> wrote:
>>> >> >>
>>> >> >> Guys,
>>> >> >>
>>> >> >> It seems that NIFI  ReplaceTextWithMapping   Processors has a BUG
>>> with Refreshing its Mapped file. We are using its functionality in PROD and
>>> getting odd behaviour.
>>> >> >>
>>> >> >> Our USAGE Scenario:
>>> >> >>
>>> >> >> We use NIFI primarily as a TRANSFORMATION Tool.
>

Re: Using ConvertRecord on compressed input

2019-07-28 Thread Koji Kawamura
Hello,

Thanks for your question. I've posted my comment to the StackOverflow question.

I'd avoid adding it to the core package as some of Record formats
handles compressed inputs by themselves, like Avro.
http://apache-avro.679487.n3.nabble.com/read-a-compressed-avro-file-td3872899.html
Adding compressed input support at RecordReaderFactory is my suggestion.

Thanks,
Koji

On Sun, Jul 28, 2019 at 6:04 PM Malthe  wrote:
>
> CSV, JSON, and XML files all compress well and thus you often see them
> stored in a compressed format. Actually, it's not feasible or simply
> practical to first decompress the files and then process them into a
> binary format.
>
> How would I go about extending or modifying the
> `AbstractRecordProcessor` to support a compressed input?
>
> Note that I have asked this question on StackOverflow [1] (currently
> without an answer).
>
> I'm thinking that this could even be something that might be included
> in the core package, either as an option or simply using a sniffing
> mechanism.
>
> [1] 
> https://stackoverflow.com/questions/57005564/using-convertrecord-on-compressed-input


Re: Debugging info for a stuck SelectHiveQL processor

2019-07-25 Thread Koji Kawamura
Hi Pat,

I recommend getting a thread-dump when you encounter the situation next time.
Thread-dump shows what each thread is doing, including the stuck
SelectHiveQL thread.

You can get thread-dump by executing:
${NIFI_HOME}/bin/nifi.sh dump-file-name

Then thread stack traces are logged to the specified file.
Lots of logs look like below:
"Timer-Driven Process Thread-8" Id=71 WAITING  on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@
1b3abf12
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

Once you get the thread dump, please share it with us for further investigation.

Thanks,
Koji

On Fri, Jul 26, 2019 at 1:57 AM Pat White  wrote:
>
> Hi Folks,
>
> Would like to ask for suggestions on debugging SelectHiveQL processors, we've 
> seen a very odd error mode twice now, where a SelectHiveQL processor which 
> had been running fine suddenly becomes "stuck". This is on 1.6.0, so a bit 
> dated compared to 1.9.2, but i'm still very puzzled at the lack of error 
> indications.
>
> Symptom; processor is running fine, continues to report 'running' on canvas 
> but the input port begins to queue up and show backlogs. Stopping the 
> processor in the canvas reports success and shows 'stopped', but trying to 
> start it again gets the popup "No eligible components are selected. Please 
> select the components to be stopped.". Making sure the processor is clearly 
> selected reports same error. Only way to get it unstuck is to restart the 
> primary, this appears to kill the affected threads and allow the processor to 
> begin running again, at that point it's ok again.
>
> Issue appears directly related to the processor itself, as opposed to say the 
> ConnectionPool. On that, tried restarting the ConnectionPool being used, stop 
> attempt hangs on the affected processor, to the point the stop fails. Another 
> oddity, tried stopping upstream objects to the affected processor, they 
> report "cannot be disabled because it is referenced by 1 components that are 
> currently running", even though the canvas clearly shows that processor as 
> stopped.
>
> What's really strange is the lack of error indications anywhere, see nothing 
> in the logs at all regarding the affected processor, until primary restart. 
> Then see the start event when the processor is coming back online 
> "StandardProcessScheduler Starting SelectHiveQL id=".
>
> Appreciate any suggestions on additional logging or other resources that 
> would help debug. Thanks!
>
> patw
>
>
>
>
>
>
>
>


Re: Bug/Issue with ReplaceTextWithMapping

2019-07-25 Thread Koji Kawamura
Hi Ameer,

Is the ReplaceTextWithMapping's 'Concurrent Tasks' set to grater than 1?
Since ReplaceTextWithMapping only reload at a single thread, other
threads may use old mapping until the loading thread complete
refreshing mapping definition.

Thanks,
Koji

On Wed, Jul 24, 2019 at 4:28 AM Ameer Mawia  wrote:
>
> Inline.
>
> On Mon, Jul 22, 2019 at 2:17 AM Koji Kawamura  wrote:
>>
>> Hi Ameer,
>>
>> How is ReplaceTextWithMapping 'Mapping File Refresh Interval' configured?
>
> [Ameer] It is configured to 1sec - the lowest value allowed.
>>
>> By default, it's set to '60s'. So,
>> 1. If ReplaceTextWithMapping ran with the old mapping file
>
> [Ameer] First Processing took place on Day-1. A new Mapping was dropped on 
> Day-1, after Day-1 Processing was over.
>>
>> 2. and the mapping file was updated for the next processing
>
> [Ameer] Second Processing took place on Day-2.
> [Ameer] Here assumption was CACHE will be refreshed from the new mapping file 
> dropped a day earlier. But ti diddnt happend. Cache got refreshed in the 
> middle of the flow - not at the very beginnning. Thus few flowfile got old 
> value and later flowfile got new value.
>>
>> 3. then the flow started processing another CSV file right away line by line
>>
>> In above scenario, some lines in the CSV might get processed with the
>> old mapping file. After 60s passed from 1, some other lines may get
>> processed with the new mappings. Is that what you're seeing?
>>
> [Ameer] This is what is happening. But it shouldn't have - becuase new 
> mapping file was already existing before the next processing begin. It should 
> have refresh right at the start - as also suggested by the code of the 
> ReplaceTextWithMapping processor.
>>
>> BTW, please avoid posting the same question to users and dev at the
>> same time. I've removed dev address.
>> [Ameer] Got it.
>> Thanks,
>> Koji
>>
>> On Sat, Jul 20, 2019 at 3:08 AM Ameer Mawia  wrote:
>> >
>> > Correcting Typo.
>> >
>> > On Fri, Jul 19, 2019 at 2:03 PM Ameer Mawia  wrote:
>> >>
>> >> Guys,
>> >>
>> >> It seems that NIFI  ReplaceTextWithMapping   Processors has a BUG with 
>> >> Refreshing its Mapped file. We are using its functionality in PROD and 
>> >> getting odd behaviour.
>> >>
>> >> Our USAGE Scenario:
>> >>
>> >> We use NIFI primarily as a TRANSFORMATION Tool.
>> >> Our flow involves:
>> >>
>> >> Getting a raw csv file.
>> >> Split the file on per line basis:
>> >>
>> >> So from one source flowfile - we may have 1 flowfile 
>> >> generated/splitted out.
>> >>
>> >> For each of the splitted flow file(flowfiles for individual lines) we 
>> >> perform transformation on the attributes.
>> >> We merge these flowfiles back and write the Output file.
>> >>
>> >>
>> >> As part of the transformation in Step#3, we do some mapping for one of 
>> >> the field in the csv. For this we use ReplaceTextWithMapping  Processor. 
>> >> Also to note we update our mapping file just before starting our flow(ie. 
>> >> Step #1)
>> >>
>> >> Our Issue:
>> >>
>> >> We have noted for SAME key we get two DIFFERENT values in two different 
>> >> flowfiles.
>> >> We noted that one of the value mapped, existed in an older Mapping file.
>> >> So in essence: ReplaceTextWithMapping Processor didn't refresh its cash 
>> >> uptill certain time. And thus return the old value for few mapping file 
>> >> and then - once in the meanwhile it has refreshed it cache - returned new 
>> >> updated value.
>> >> And this cause the issue?
>> >>
>> >> Question:
>> >>
>> >> Is this a known issue with  ReplaceTextWithMapping Processor?
>> >> If not how can I create an issue for this?
>> >> How can I confirm this behaviour?
>> >>
>> >> Thanks,
>> >> Ameer Mawia
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> http://ca.linkedin.com/in/ameermawia
>> >> Toronto, ON
>> >>
>> >
>> >
>> > --
>> > http://ca.linkedin.com/in/ameermawia
>> > Toronto, ON
>> >
>
>
>
> --
> http://ca.linkedin.com/in/ameermawia
> Toronto, ON
>


Re: Bug/Issue with ReplaceTextWithMapping

2019-07-22 Thread Koji Kawamura
Hi Ameer,

How is ReplaceTextWithMapping 'Mapping File Refresh Interval' configured?
By default, it's set to '60s'. So,
1. If ReplaceTextWithMapping ran with the old mapping file
2. and the mapping file was updated for the next processing
3. then the flow started processing another CSV file right away line by line

In above scenario, some lines in the CSV might get processed with the
old mapping file. After 60s passed from 1, some other lines may get
processed with the new mappings. Is that what you're seeing?

BTW, please avoid posting the same question to users and dev at the
same time. I've removed dev address.

Thanks,
Koji

On Sat, Jul 20, 2019 at 3:08 AM Ameer Mawia  wrote:
>
> Correcting Typo.
>
> On Fri, Jul 19, 2019 at 2:03 PM Ameer Mawia  wrote:
>>
>> Guys,
>>
>> It seems that NIFI  ReplaceTextWithMapping   Processors has a BUG with 
>> Refreshing its Mapped file. We are using its functionality in PROD and 
>> getting odd behaviour.
>>
>> Our USAGE Scenario:
>>
>> We use NIFI primarily as a TRANSFORMATION Tool.
>> Our flow involves:
>>
>> Getting a raw csv file.
>> Split the file on per line basis:
>>
>> So from one source flowfile - we may have 1 flowfile generated/splitted 
>> out.
>>
>> For each of the splitted flow file(flowfiles for individual lines) we 
>> perform transformation on the attributes.
>> We merge these flowfiles back and write the Output file.
>>
>>
>> As part of the transformation in Step#3, we do some mapping for one of the 
>> field in the csv. For this we use ReplaceTextWithMapping  Processor. Also to 
>> note we update our mapping file just before starting our flow(ie. Step #1)
>>
>> Our Issue:
>>
>> We have noted for SAME key we get two DIFFERENT values in two different 
>> flowfiles.
>> We noted that one of the value mapped, existed in an older Mapping file.
>> So in essence: ReplaceTextWithMapping Processor didn't refresh its cash 
>> uptill certain time. And thus return the old value for few mapping file and 
>> then - once in the meanwhile it has refreshed it cache - returned new 
>> updated value.
>> And this cause the issue?
>>
>> Question:
>>
>> Is this a known issue with  ReplaceTextWithMapping Processor?
>> If not how can I create an issue for this?
>> How can I confirm this behaviour?
>>
>> Thanks,
>> Ameer Mawia
>>
>>
>>
>>
>> --
>> http://ca.linkedin.com/in/ameermawia
>> Toronto, ON
>>
>
>
> --
> http://ca.linkedin.com/in/ameermawia
> Toronto, ON
>


Re: Nifi and SSL offloading

2019-07-07 Thread Koji Kawamura
Hi Nicolas,

As you already know, all authentication methods implemented in NiFi
require a secure connection.
Each implementation class uses HttpServletRequest.isSecure method to
determine if authentication is necessary.

For example, JWTAuthenticationFilter:
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/jwt/JwtAuthenticationFilter.java#L42

NiFi uses Jetty inside, and Jetty has ForwardedRequestCustomizer which
uses X-Forwarded-* headers to customize requests so that NiFi sees
HTTP requests forwarded by a reverse-proxy server as HTTPS. But NiFi
doesn't use that currently.
So, I believe enabling auth in HTTP is not supported now.

There have been similar requests and existing JIRA NIFI-6152 (that is
specific for OIDC though).
https://issues.apache.org/jira/browse/NIFI-6152

Which authentication method are you planning to use?

Thanks,
Koji

On Fri, Jul 5, 2019 at 5:43 PM Nicolas Delsaux  wrote:
>
> Hi
>
> I'm trying to deploy Nifi in Kubernetes with authentcation.
>
> In Kubernetes, it is possible (and recommended in my organization) to
> have SSL managed by cluster at edge route level. Which means request
> seen by Nifi are http ones.
>
> According to nifi documentation, it seems to imply no authentication is
> possible in this case.
>
> However, in our context, the X-Forwarded-Proto header is set (see
> https://en.wikipedia.org/wiki/List_of_HTTP_header_fields#Common_non-standard_request_fields),
> which could be used to enable authentication in HTTP.
>
> So is it possible to do that ? And if so, how ?
>


Re: NiFi Wait / Notify not releasing on signal

2019-06-19 Thread Koji Kawamura
(Forgot clicking the SEND button...)

Hi ara, thank you for sharing the issue.

I've submitted a PR to add new 'Penalize Waiting FlowFiles' property
to Wait processor.
https://github.com/apache/nifi/pull/3538/files

To make existing flows intact, it's disabled by default.
If enabled, FlowFiles routed to 'wait' or stay in the incoming
connection will be penalized if signal is not notified yet.
That will prevent such FlowFiles to be processed again for the next
configured 'Penalty Duration' and other queued ones get processed.

That'd be great if Mark can review the PR and get it merged soon.

Thanks,
Koji

On Wed, Jun 19, 2019 at 6:27 AM Mark Payne  wrote:
>
> I see. Well, the good news is that it is easily replicated. If I queue up 2 
> FlowFiles, one with signal=aa, the other with signal=bb,
> then i use Notify to signal bb, the Wait processor does not process it. If I 
> then Notify on signal aa, then both FlowFiles get
> processed by the Wait processor.
>
> Given that the Processor does not document this behavior, and it violates 
> user expectations, I would agree that this is a bug.
>
> I created a JIRA [1] for this.
>
> Thanks
> -Mark
>
> [1] https://issues.apache.org/jira/browse/NIFI-6385
>
> On Jun 18, 2019, at 4:34 PM, ara m.  wrote:
>
> I'm not going over 20,000 flowfiles, I have barely 2 in there.
> I just think thats how its coded. Somehow the FIFO can see all the items in
> the queue and no matter the signal can rip them out of any order,
> but the other Prioritizers bring one to the front of the queue and only
> consider that when matching signals..
> so if you have:
> signal=aa, priority=1
> signal=bb, priority=2
> then then itll only consider signal=aa because it has higher priority when
> matching signals. So its kind of dead to any signals except the one with the
> highest priority.
>
> I think thats the intended way for the prioritizer to work? But i'm not sure
> , could be a bug? I hope it is, because my use case is to have different
> signals and different priority and to be able to send a signal to Notify and
> peel of items that are higher in priority but not get blocked because they
> aren't the highest priority with a different signal.
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
>
>


Re: NiFi - create folders and files based on hostname and date

2019-05-22 Thread Koji Kawamura
Hello,

In order to create the folders automatically, I would use PutFile like below:
- Assuming the incoming FlowFile has attribute named 'host' so that
PutFile's 'Directory' can refer it using NiFi Expression Language (EL)
- The date part can also be generated using 'now()' EL function
- Directory: 
/var/log/network/${host}/${now():format('')}/${now():format('MM')}/${now():format('dd')}/syslog.log
- Create Missing Directories: true

Thanks,
Koji

On Thu, May 23, 2019 at 8:36 AM N. J.  wrote:
>
> Hi,
>
>
> Trying to "practically" replace multiple ingestion tools with NiFi. I have 
> Syslog-NG as one these tools storing syslog messages to files based on the 
> hostname (syslog.hostname) extracted from the syslog message and the date 
> when the message was received. The following shows the Syslog-NG destination 
> configuration:
>
> "
>
> destination d_network 
> {file("/var/log/network/$HOST/$YEAR/$MONTH/$DAY/syslog.log" owner(root) 
> group(root) perm(0666) dir_perm(0777) create_dirs(yes) ); };
>
> "
>
> Syslog-NG would create folders automatically based on the hostname, year, 
> month and day, and then store the data to a file called "syslog.log". The 
> file would be appended if it already exists.
>
>
> Couple of challenges I faced with NiFi when trying to do the same:
>
> 1. The regular file append issue (which a patch exists for)
>
> 2. The bigger problem is how to automatically create the folders without the 
> need to manually do this for each source, noting that I have 100s of sources.
>
>
> Suggestions on how to do this on NiFi would be appreciated.
>
>
> Thanks,


Re: execute process

2019-05-22 Thread Koji Kawamura
Hello,

Which version of NiFi are you using?
The relationship "nonzero status" is available since 1.5.0.
NIFI-4559: ExecuteStreamCommand should have a failure relationship
https://issues.apache.org/jira/browse/NIFI-4559

I hope you can try that.

Thanks,
Koji

On Wed, May 22, 2019 at 11:15 PM Michael Di Domenico
 wrote:
>
> i'm a little confused over running external commands on flowfiles.
> what i want to do is pick up a file from the filesystem (getfile) and
> virus scan it.  based on the results of the virus scan (pass/fail) i
> want to move the file to a different directory.
>
> is this possible?  i looked at the executeprocess/script processors,
> but none of them seem to do that.  the executestream seemed close, but
> doesn't seem to have success/fail based on the exit code of the
> process.  in fact my process failed to run and the processor still
> "succeed" which is not what i want.
>
> i think what i want is a "routeonattribute" after the "executestream".
> but before i go down that road in the documentation i wanted to just
> check and see if there were any other ideas.


Re: Advice on orchestrating Nifi with dockerized external services

2019-04-10 Thread Koji Kawamura
Hi Eric,

Although my knowledge on MiNiFi, Python and Go is limited, I wonder if
"nanofi" library can be used from the proprietary application so that
they can fetch FlowFiles directly using Site-to-Site protocol. That
can be an interesting approach and will be able to eliminate the need
of storing data to a local volume (mentioned in the possible approach
A).
https://github.com/apache/nifi-minifi-cpp/tree/master/nanofi

The latest MiNiFi (C++) version 0.6.0 was released recently.
https://cwiki.apache.org/confluence/display/MINIFI/Release+Notes

Thanks,
Koji

On Thu, Apr 11, 2019 at 2:28 AM Eric Chaves  wrote:
>
> Hi Folks,
>
> My company is using nifi to perform several data-flow process and now we 
> received a requirement to do some fairly complex ETL over large files. To 
> process those files we have some proprietary applications (mostly written in 
> phyton or go) that ran as docker containers.
>
> I don't think that porting those apps as nifi processors would produce a good 
> result due to each app complexity.
>
> Also we would like keep using the nifi queues so we can monitor overall 
> progress as we already do (we ran several other nifi flows) so we are 
> discarding for now solutions that for example submit files to an external 
> queue like SQS or Rabbit for consumption.
>
> So far we come up with two solutions that would:
>
> have kubernete cluster of running jobs periodically querying the nifi queue 
> for new flowfiles and pull one when a file arrives.
> download the file-content (which is already stored outside of nifi) and 
> process it.
> submit the result back to nifi (using a HTTP Listener processor) to trigger 
> subsequent nifi process.
>
>
> For step 1 and 2 so far we are considering two possible approaches:
>
> A) use a minifi container togheter with the app container in a sidecar 
> design. minifi would connect to our nifi cluster and handle file download to 
> a local volume for the app container process them.
>
> B) use nifi rest API to query and consume flowfiles on queue
>
> One requirement is that if needed we would manually scale up the app cluster 
> to have multiple containers consumer more queued files in parallel.
>
> Do you guys recommend one over another (or a third approach)? Any pitfalls 
> you can foresee?
>
> Would be really glad to hear your thoughts on this matter.
>
> Best regards,
>
> Eric


Re: GetHbase state

2019-04-10 Thread Koji Kawamura
Hi Dwane,

I agree with you, seeing duplicated loaded data with scenario 3
(staged) is strange. It should behave the same as scenario 1 if Pig
and NiFi were not running concurrently.

>From scenario 3 description:
> In this scenario we reloaded the same data several times and the state 
> behaved unusually after the first run.  Timestamp entries were placed in the 
> processor state but they appeared to be ignored on subsequent runs with the 
> data being reloaded several times.

I assume the test looks like:
1. Let's say there are 1000 rows total in the dataset to be loaded
2. Divide them into 10 chunks. Each has 100 rows
3. Pig inserts the first chunk to HBase, then stops
4. NiFi loads the first chunk from HBase, NiFi gets 100 rows. State is stored.
5. Stop NiFi GetHBase, run Pig to insert the 2nd chunk to HBase
6. After Pig(step 5) finishes, restart GetHBase. Restart GetHBase and
NiFi gets 100+ rows, containing duplicates that have seen at step 4

Do I understand it correctly? Please correct me if I'm wrong.

There's one possibility to see duplicated rows at step 6. If Pig put
different cells but within the same row at step 3 and step 5.
For example, if row(1) contains cell(A) and cell(B),
Pig puts cell(A) to HBase, then GetHBase get cell(A) at the first run.
Pig puts cell(B) to HBase, then GetHBase gets cell(A) and cell(B) at
the second run.

If that's not the case, I think we need to investigate it more.
To do so, would you be able to share more details, such as the stored
GetHBase state at the step 4 and what GetHBase get at step 6.
Executing scan operation at step 4 using the stored timestamp as min
timestamp from other tools than NiFi such as HBase shell would help to
know what timestamp each cell has. GetHBase doesn't output each cell
timestamp IIRC.

Thanks,
Koji

On Wed, Apr 10, 2019 at 6:25 PM Dwane Hall  wrote:
>
> Hey Koji,
>
> Thanks for the response and great question regarding the Pig job load 
> timestamp.  No we rely entirely on the Hbase timestamp value for our update 
> values.  We did pre-split our regions for greater throughput and to avoid 
> overloading a single region server.  Additionally, our key values were 
> perfectly distributed using the datafu MD5 hash algorithm to optimise 
> throughput.
>
> We suspected the same thing with the concurrent writes and distributed nature 
> of Hbase which is why we attempted scenario 3 (staged) below.  We were very 
> surprised that we reloaded data under these conditions.
>
> Thanks again for your input.
>
> Dwane
> 
> From: Koji Kawamura 
> Sent: Friday, 5 April 2019 6:08 PM
> To: users@nifi.apache.org
> Subject: Re: GetHbase state
>
> Hi Dwane,
>
> Does the Pig job puts HBase data with custom timestamps? For example,
> the loading data contains last_modified timestamp, and it's used as
> HBase cell timestamp.
> If that's the case, GetHbase may miss some HBase rows, unless the Pig
> job loads raws ordered by the timestamp when Pig and GetHBase run
> concurrently (case 2 in your scenarios).
>
> By looking at the case 2 and 3, I imagine there may be some rows
> containing custom timestamp and others don't.
>
> If Pig doesn't specify timestamp, HBase Region Server sets it.
> In that case, more recently written cell will have the more recent timestamp.
> However, due to the distributed nature of HBase, I believe chances are
> a scan operation using minimum timestamp misses some rows if rows
> being written simultaneously.
>
>
> # 
> # Idea for future improvement
> # 
> Currently GetHBase treats the last timestamp as a single point of time.
> In order to decrease the possibility to miss rows those are being
> written simultaneously, I think GetHBase should implement checking
> timestamps as a range.
> For example:
> - Add new processor property "Scan Timestamp Range", such as "3 secs"
> - Remember the latest timestamp in the previous scan, and all row keys
> (and ts) for those having its timestamp > "latest timestamp - 3 secs"
> - When performing a scan, set min_timestamp as "latestSeenTimestamp -
> 3 secs", this will contain duplication
> - Emit result using rows whose row keys are not contained in the
> previously scanned keys, or row timestamp is updated
>
> Thanks,
> Koji
>
> On Thu, Apr 4, 2019 at 8:47 PM Dwane Hall  wrote:
> >
> > Hey fellow NiFi fans,
> >
> > I was recently loading data into into Solr via HBase (around 700G 
> > ~60,000,000 db rows) using NiFi and noticed some inconsistent behaviour 
> > with the GetHbase processor and I'm wondering if anyone else has noticed 
> > similar behaviour when using it.
> >
> > Here's our environment and the high 

Re: GetHbase state

2019-04-05 Thread Koji Kawamura
Hi Dwane,

Does the Pig job puts HBase data with custom timestamps? For example,
the loading data contains last_modified timestamp, and it's used as
HBase cell timestamp.
If that's the case, GetHbase may miss some HBase rows, unless the Pig
job loads raws ordered by the timestamp when Pig and GetHBase run
concurrently (case 2 in your scenarios).

By looking at the case 2 and 3, I imagine there may be some rows
containing custom timestamp and others don't.

If Pig doesn't specify timestamp, HBase Region Server sets it.
In that case, more recently written cell will have the more recent timestamp.
However, due to the distributed nature of HBase, I believe chances are
a scan operation using minimum timestamp misses some rows if rows
being written simultaneously.


# 
# Idea for future improvement
# 
Currently GetHBase treats the last timestamp as a single point of time.
In order to decrease the possibility to miss rows those are being
written simultaneously, I think GetHBase should implement checking
timestamps as a range.
For example:
- Add new processor property "Scan Timestamp Range", such as "3 secs"
- Remember the latest timestamp in the previous scan, and all row keys
(and ts) for those having its timestamp > "latest timestamp - 3 secs"
- When performing a scan, set min_timestamp as "latestSeenTimestamp -
3 secs", this will contain duplication
- Emit result using rows whose row keys are not contained in the
previously scanned keys, or row timestamp is updated

Thanks,
Koji

On Thu, Apr 4, 2019 at 8:47 PM Dwane Hall  wrote:
>
> Hey fellow NiFi fans,
>
> I was recently loading data into into Solr via HBase (around 700G ~60,000,000 
> db rows) using NiFi and noticed some inconsistent behaviour with the GetHbase 
> processor and I'm wondering if anyone else has noticed similar behaviour when 
> using it.
>
> Here's our environment and the high level workflow we were attempting:
>
> Apache NiFi 1.8
> Two node cluster (external zookeeper maintaining processor state)
> HBase 1.1.2
>
> Step 1 We execute an external Pig job to load data into a HBase table.
> Step 2 (NiFi) We use a GetHbase processor listening to the HBase table for 
> new data - Execution context set to Primary Node only.
> Step 3 (NiFi) Some light attribute addition and eventually the data is stored 
> in Solr using PutSolrContentStream.
>
> What we found during our testing is that the GetHBase processor did not 
> appear to accurately maintain its state as data was being loaded out of 
> Hbase.  We tried a number of load strategies with varying success.
>
> No concurrency - Wait for all data to be loaded into HBase by the Pig job and 
> then dump all 700G of data into NiFi. This was successful as there was no 
> state dependency but we lose the ability to run the Solr load and Pig job in 
> parallel and dump a lot of data on NiFi at once.
> Concurrent - Pig job and GetHbase processor running concurrently. Here we 
> found we missed ~ 30% of the data we were loading into Solr.
> Staged. Load a portion of data into HBase using Pig, start the GetHbase 
> processor to load that portion of data and repeat until all data is loaded 
> (Pig job and GetHbase processor were never run concurrently).  In this 
> scenario we reloaded the same data several times and the state behaved 
> unusually after the first run.  Timestamp entries were placed in the 
> processor state but they appeared to be ignored on subsequent runs with the 
> data being reloaded several times.
>
>
> We were able to work around the issue using the ScanHbase processor and 
> specifying our key range to load the data so it was not a showstopper for us. 
>  I'm just wondering if any other users in the community have had similar 
> experiences using this processor or if I need to revise my local environment 
> configuration?
>
> Thanks,
>
> Dwane


Re: NiFi Registry Not Auditing Denied Errors

2019-04-04 Thread Koji Kawamura
Hi Shawn,

The 'No applicable policies could be found.' message can be logged
when a request is made against a resource which doesn't exist.
https://github.com/apache/nifi-registry/blob/master/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authorization/resource/Authorizable.java#L236,L247

If a request for a valid resource, but the user doesn't have right
permissions, then the log should look like this:
2019-04-04 14:34:58,492 INFO [NiFi Registry Web Server-71]
o.a.n.r.w.m.AccessDeniedExceptionMapper identity[CN=alice, OU=NIFI],
groups[] does not have permission to access the requested resource.
Unable to view Bucket with ID b5c0b8d3-44df-4afd-9e4b-114c0e299268.
Returning Forbidden response.

Enabling Jetty debug log may be helpful to get more information, but
lots of noisy logs should be expected.
E.g. add this entry to conf/logback.xml


Thanks,
Koji

On Sat, Mar 30, 2019 at 11:58 PM Shawn Weeks  wrote:
>
> I remember seeing something where we reduced the amount of auditing for 
> access denied errors the NiFi Ranger plugin was doing. On a new installation 
> with Registry 0.3.0 I’m not seeing any access denied errors at all despite 
> the app log showing them. It’s making it really hard to figure out what 
> exactly is failing. I know it’s related to the host access but the error log 
> doesn’t say what was being accessed.
>
>
>
> Basically I get log messages like these.
>
>
>
> 2019-03-30 09:56:54,817 INFO [NiFi Registry Web Server-20] 
> o.a.n.r.w.m.AccessDeniedExceptionMapper identity[hdp31-df3.dev.example.com], 
> groups[] does not have permission to access the requested resource. No 
> applicable policies could be found. Returning Forbidden response.
>
>
>
> I could just give blanket access to everything but I prefer to be more 
> precise.
>
>
>
> Thanks
>
> Shawn Weeks


Re: Problem with load balancing option

2019-03-25 Thread Koji Kawamura
Glad to hear you got the load-balancing works correctly!

Thanks for pointing out that the lack of new properties on migration guide.
I've added a note for the new load balancing port.
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

On Mon, Mar 25, 2019 at 8:06 PM Jean-Sebastien Vachon <
jsvac...@brizodata.com> wrote:

> Hi,
>
> I saw that bug report and I will upgrade to the latest version ASAP. But
> my main problem was the lack of the section to configure the load balancer
> correctly. Once I've added the section and opened the required ports in my
> infrastructure, everything started to work as expected and it is a life
> changer 
>
> The load is now properly balanced between all nodes and the performance
> boost I got is outstanding
>
> One note however, I've checked the migration guide from 1.8 to 1.9 and
> didn't see any mention of this new section within nifi.properties. It might
> be good idea to add a section about this so that people upgrading their
> cluster have all the information at hand. This might save them some time.
>
> Thanks all for your outstanding work
> --
> *From:* Koji Kawamura 
> *Sent:* Sunday, March 24, 2019 10:39 PM
> *To:* users@nifi.apache.org
> *Cc:* Jean-Sebastien Vachon
> *Subject:* Re: Problem with load balancing option
>
> Hi,
>
> That looks similar to this one:
> Occasionally FlowFiles appear to get "stuck" in a Load-Balanced Connection
> https://issues.apache.org/jira/browse/NIFI-5919
>
> If you're using NiFi 1.8.0, I recommend trying the latest 1.9.1 which
> has the fix for the above issue.
>
> Hope this helps.
>
> Koji
>
> On Sat, Mar 23, 2019 at 12:15 AM Jean-Sebastien Vachon
>  wrote:
> >
> > Hi,
> >
> > FYI, I managed to get my node back by removing the node from the
> cluster, deleting the local flow and restart Nifi.
> >
> > Hope this helps identify the issue
> > 
> > From: Jean-Sebastien Vachon 
> > Sent: Friday, March 22, 2019 10:56 AM
> > To: users@nifi.apache.org
> > Subject: Re: Problem with load balancing option
> >
> > Hi again,
> >
> > I thought everything was fine but one of my node can not start..
> >
> > 2019-03-22 14:51:27,811 INFO [main]
> o.a.n.wali.SequentialAccessWriteAheadLog Successfully recovered 10396
> records in 367 milliseconds. Now checkpointing to ensure that Write-Ahead
> Log is in a consistent state
> > 2019-03-22 14:51:28,046 INFO [main]
> o.a.n.wali.SequentialAccessWriteAheadLog Checkpointed Write-Ahead Log with
> 10396 Records and 0 Swap Files in 235 milliseconds (Stop-the-world time = 6
> milliseconds), max Transaction ID 24370
> > 2019-03-22 14:51:28,065 ERROR [main]
> o.a.nifi.controller.StandardFlowService Failed to load flow from cluster
> due to: org.apache.nifi.cluster.ConnectionExcepti
> > on: Failed to connect node to cluster due to:
> java.lang.ArrayIndexOutOfBoundsException: -1
> > org.apache.nifi.cluster.ConnectionException: Failed to connect node to
> cluster due to: java.lang.ArrayIndexOutOfBoundsException: -1
> > at
> org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:1009)
> > at
> org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:539)
> > at
> org.apache.nifi.web.server.JettyServer.start(JettyServer.java:939)
> > at org.apache.nifi.NiFi.(NiFi.java:157)
> > at org.apache.nifi.NiFi.(NiFi.java:71)
> > at org.apache.nifi.NiFi.main(NiFi.java:296)
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> > at
> org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner.getPartition(CorrelationAttributePartitioner.java:44)
> > at
> org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.getPartition(SocketLoadBalancedFlowFileQueue.java:611)
> > at
> org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.putAndGetPartition(SocketLoadBalancedFlowFileQueue.java:749)
> > at
> org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.put(SocketLoadBalancedFlowFileQueue.java:739)
> > at
> org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.loadFlowFiles(WriteAheadFlowFileRepository.java:587)
> > at
> org.apache.nifi.controller.FlowController.initializeFlow(FlowController.java:818)
> > at
> org.apache.nifi.controller.StandardFlowService.initializeController(StandardFlowService.java:1019)
> > at
> org.apache.nifi.controller.StandardFlowService.loa

Re: Problem with load balancing option

2019-03-24 Thread Koji Kawamura
Hi,

That looks similar to this one:
Occasionally FlowFiles appear to get "stuck" in a Load-Balanced Connection
https://issues.apache.org/jira/browse/NIFI-5919

If you're using NiFi 1.8.0, I recommend trying the latest 1.9.1 which
has the fix for the above issue.

Hope this helps.

Koji

On Sat, Mar 23, 2019 at 12:15 AM Jean-Sebastien Vachon
 wrote:
>
> Hi,
>
> FYI, I managed to get my node back by removing the node from the cluster, 
> deleting the local flow and restart Nifi.
>
> Hope this helps identify the issue
> 
> From: Jean-Sebastien Vachon 
> Sent: Friday, March 22, 2019 10:56 AM
> To: users@nifi.apache.org
> Subject: Re: Problem with load balancing option
>
> Hi again,
>
> I thought everything was fine but one of my node can not start..
>
> 2019-03-22 14:51:27,811 INFO [main] o.a.n.wali.SequentialAccessWriteAheadLog 
> Successfully recovered 10396 records in 367 milliseconds. Now checkpointing 
> to ensure that Write-Ahead Log is in a consistent state
> 2019-03-22 14:51:28,046 INFO [main] o.a.n.wali.SequentialAccessWriteAheadLog 
> Checkpointed Write-Ahead Log with 10396 Records and 0 Swap Files in 235 
> milliseconds (Stop-the-world time = 6 milliseconds), max Transaction ID 24370
> 2019-03-22 14:51:28,065 ERROR [main] o.a.nifi.controller.StandardFlowService 
> Failed to load flow from cluster due to: 
> org.apache.nifi.cluster.ConnectionExcepti
> on: Failed to connect node to cluster due to: 
> java.lang.ArrayIndexOutOfBoundsException: -1
> org.apache.nifi.cluster.ConnectionException: Failed to connect node to 
> cluster due to: java.lang.ArrayIndexOutOfBoundsException: -1
> at 
> org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:1009)
> at 
> org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:539)
> at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:939)
> at org.apache.nifi.NiFi.(NiFi.java:157)
> at org.apache.nifi.NiFi.(NiFi.java:71)
> at org.apache.nifi.NiFi.main(NiFi.java:296)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at 
> org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner.getPartition(CorrelationAttributePartitioner.java:44)
> at 
> org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.getPartition(SocketLoadBalancedFlowFileQueue.java:611)
> at 
> org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.putAndGetPartition(SocketLoadBalancedFlowFileQueue.java:749)
> at 
> org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.put(SocketLoadBalancedFlowFileQueue.java:739)
> at 
> org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.loadFlowFiles(WriteAheadFlowFileRepository.java:587)
> at 
> org.apache.nifi.controller.FlowController.initializeFlow(FlowController.java:818)
> at 
> org.apache.nifi.controller.StandardFlowService.initializeController(StandardFlowService.java:1019)
> at 
> org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:991)
> ... 5 common frames omitted
>
> Any idea?
> 
> From: Jean-Sebastien Vachon
> Sent: Friday, March 22, 2019 10:34 AM
> To: Jean-Sebastien Vachon; users@nifi.apache.org
> Subject: Re: Problem with load balancing option
>
> Hi,
>
> I stopped each node one by one and the queue is now empty. Not sure if this 
> is a bug or intended but it does look strange from a user point of view
>
> Thanks
> 
> From: Jean-Sebastien Vachon 
> Sent: Friday, March 22, 2019 10:28 AM
> To: users@nifi.apache.org
> Subject: Problem with load balancing option
>
> Hi all,
>
> I've configured one of my connection to use the "partition by attribute" load 
> balancing option.
> It was not working as expected and after a few tests I realized I was missing 
> some dependencies on the cluster nodes so I stopped everything (not related 
> to the load balancing or Nifi at all)
>
> Now, I stopped everything before fixing  my dependencies issues and the UI 
> shows 1906 items in the queue for that connection but I can't list them or 
> empty the queue.
> Nifi tells me that there are no flow files in the queue when I try to list 
> them and that 0 flowfiles out of 1906 were removed from the queue.
>
> I tried connecting the destination to some other process like a LogMessage 
> processor but nothing is happening. The 1906 items are stuck and I cannot 
> delete the connection because it's not empty.
>
> Any recommendations to fix this?
>
> thanks
>


Re: How can I ExtractGrok from end-of-file?

2019-03-19 Thread Koji Kawamura
Hello Eric,

Have you found any solution for this?
If your trailers (footer?) starts with certain byte sequence, then
SplitContent may be helpful to split the content into Header+Payload,
and the Trailers.
If that works, then the subsequent flow can do something creative
probably using RouteOnAttribute, GrokExtract, MergeContent (with
defragment merge strategy) ... etc.

Thanks,
Koji

On Fri, Mar 15, 2019 at 11:34 PM Eric Chaves  wrote:
>
> Hi folks,
>
> I'm learning how to use grok with nifi starting  with the ExtractGrok 
> processor and I'd like to use it to extract data from file headers and 
> trailers however since the GrokExtract processor only apply the grok 
> expression on the defined buffer size (and each of my file differs on size) I 
> can't evaluate trailers on every file.
>
> Any ideas on how could I apply the grok expression from the end of file 
> instead of from the beginning, or any alternative processor?
>
> Cheers,
>


Re: Convert Avro to ORC or JSON processor - retaining the data type

2019-03-10 Thread Koji Kawamura
Hi Ravi,

How about storing those as string, and cast strings into numeric data
type int/bigint when you query them?
https://stackoverflow.com/questions/28867438/hive-converting-a-string-to-bigint

Thanks,
Koji

On Sat, Mar 9, 2019 at 6:10 AM Ravi Papisetti (rpapiset)
 wrote:
>
> Thanks Koji for the response. Our users want to run hiveql queries with some 
> comparators and cannot work with string type for numeric data type.
>
> Any other options?
>
> Thanks,
> Ravi Papisetti
>
> On 07/03/19, 7:14 PM, "Koji Kawamura"  wrote:
>
> Hi Ravi,
>
> I looked at following links, Hive does support some logical types like
> timestamp-millis, but not sure if decimal is supported.
> https://issues.apache.org/jira/browse/HIVE-8131
> 
> https://cwiki.apache.org/confluence/display/Hive/AvroSerDe#AvroSerDe-AvrotoHivetypeconversion
>
> If treating the number as String works in your use-case, then I'd
> recommend disabling "Use Avro Logical Types" at ExecuteSQL.
>
> Thanks,
> Koji
>
> On Fri, Mar 8, 2019 at 4:48 AM Ravi Papisetti (rpapiset)
>  wrote:
> >
> > Hi,
> >
> >
> >
> > Nifi version 1.7
> >
> >
> >
> > We have a dataflow that would get data from Oracle database and load 
> into hive tables.
> >
> >
> >
> > Data flow is something like below:
> >
> > GenerateTableFetch -> ExecuteSQL > AvrotoJson/ORC (we tried both) > 
> PutHDFS > ListHDFS> ReplaceTExt (to build load data query form the file) > 
> PutHiveQL.
> >
> >
> >
> > Data at source (ex: column "cpyKey" NUMBER)  in Number/INT format is 
> being written as
> >
> > 
> {"type":"record","name":"NiFi_ExecuteSQL_Record","namespace":"any.data","fields":[{"name":"cpyKey","type":["null",{"type":"bytes","logicalType":"decimal","precision":10,"scale":0}]}
> >
> >
> >
> > When this is inserted into hive table weather data is loaded from ORC 
> (convertAvroToORC)  file or JSON (ConvertAvroToJSON) file, querying data from 
> hive throws parsing exception with incompatible data types.
> >
> >
> >
> > Error: java.io.IOException: java.lang.RuntimeException: ORC split 
> generation failed with exception: java.lang.IllegalArgumentException: ORC 
> does not support type conversion from file type binary (1) to reader type 
> bigint (1) (state=,code=0)
> >
> >
> >
> > Appreciate any help on this.
> >
> >
> >
> > Thanks,
> >
> > Ravi Papisetti
>
>


Re: QueryRecord and NULLs

2019-03-07 Thread Koji Kawamura
Using NULLIF can be a workaround. I was able to populate new columns with null.

SELECT
*
,NULLIF(5, 5) as unit_cerner_alias
,NULLIF(5, 5) as room_cerner_alias
,NULLIF(5, 5) as bed_cerner_alias
FROM FLOWFILE

On Fri, Mar 8, 2019 at 7:57 AM Boris Tyukin  wrote:
>
> I am struggling for an hour now with a very simple thing.
>
> I need to add 3 new fields to a record and set them to NULL but it does not 
> work.
>
> I tried null instead - same thing. I checked Calcite docs and I do not see 
> anything special about NULL. And I know you can do it in SQL.
>
> This works:
>
> SELECT
> *
> ,'' as unit_cerner_alias
> ,'' as room_cerner_alias
> ,'' as bed_cerner_alias
> FROM FLOWFILE
>
> But this does not:
>
> SELECT
> *
> ,NULL as unit_cerner_alias
> ,NULL as room_cerner_alias
> ,NULL as bed_cerner_alias
> FROM FLOWFILE
>
> Then I use LookupRecord processor to populate them or leave with NULL


Re: Convert Avro to ORC or JSON processor - retaining the data type

2019-03-07 Thread Koji Kawamura
Hi Ravi,

I looked at following links, Hive does support some logical types like
timestamp-millis, but not sure if decimal is supported.
https://issues.apache.org/jira/browse/HIVE-8131
https://cwiki.apache.org/confluence/display/Hive/AvroSerDe#AvroSerDe-AvrotoHivetypeconversion

If treating the number as String works in your use-case, then I'd
recommend disabling "Use Avro Logical Types" at ExecuteSQL.

Thanks,
Koji

On Fri, Mar 8, 2019 at 4:48 AM Ravi Papisetti (rpapiset)
 wrote:
>
> Hi,
>
>
>
> Nifi version 1.7
>
>
>
> We have a dataflow that would get data from Oracle database and load into 
> hive tables.
>
>
>
> Data flow is something like below:
>
> GenerateTableFetch -> ExecuteSQL > AvrotoJson/ORC (we tried both) > PutHDFS > 
> ListHDFS> ReplaceTExt (to build load data query form the file) > PutHiveQL.
>
>
>
> Data at source (ex: column "cpyKey" NUMBER)  in Number/INT format is being 
> written as
>
> {"type":"record","name":"NiFi_ExecuteSQL_Record","namespace":"any.data","fields":[{"name":"cpyKey","type":["null",{"type":"bytes","logicalType":"decimal","precision":10,"scale":0}]}
>
>
>
> When this is inserted into hive table weather data is loaded from ORC 
> (convertAvroToORC)  file or JSON (ConvertAvroToJSON) file, querying data from 
> hive throws parsing exception with incompatible data types.
>
>
>
> Error: java.io.IOException: java.lang.RuntimeException: ORC split generation 
> failed with exception: java.lang.IllegalArgumentException: ORC does not 
> support type conversion from file type binary (1) to reader type bigint (1) 
> (state=,code=0)
>
>
>
> Appreciate any help on this.
>
>
>
> Thanks,
>
> Ravi Papisetti


Re: Errors when attempting to use timestamp-millis fields with QueryRecord

2019-03-07 Thread Koji Kawamura
Hello,

I believe this is a known issue. Unfortunately, querying against
timestamp column is not supported.
https://issues.apache.org/jira/browse/NIFI-5888

I'm working on fixing this at Calcite project, the sql execution
engine underneath QueryRecord.
https://issues.apache.org/jira/browse/CALCITE-1703

Thanks,
Koji

On Thu, Mar 7, 2019 at 11:11 PM Edward George  wrote:
>
> I have some input avro with some fields using the timestamp-millis 
> logicalType. I've been attempting to use them with QueryRecord to filter, or 
> otherwise operate on the fields, using timestamp operations and I get errors 
> produced.
>
> For instance the following SQL queries:
>
> SELECT * FROM FLOWFILE WHERE dt > TIMESTAMP '1984-01-01 00:00:00'
>
> SELECT * FROM FLOWFILE WHERE CAST(dt AS TIMESTAMP) > TIMESTAMP '1984-01-01 
> 00:00:00'
>
> SELECT YEAR(dt) FROM FLOWFILE
>
> All fail with the following error:
>
>  java.lang.RuntimeException: Cannot convert 2019-02-19 01:01:01.0 to long
>
> Where the date `2019-02-19 01:01:01` is from the first row in the flowfile.
>
> Is this a bug with the implementation of QueryRecord or is there something 
> wrong with my queries / expectations here?
>
> Tested on nifi v1.9.0 using the official docker image.
>
> If I instead try the following SQL:
>
> SELECT * FROM FLOWFILE WHERE dt > 1
>
> I can see that the timestamp-millis column is represented as a 
> java.sql.Timestamp object:
>
>  org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '>' to 
> arguments of type ' > '.
>
> This was reproduced using this test avro file:
>
> $ avro-utils getmeta nifi-data/out2.avro
> avro.codec  deflate
> avro.schema {"type": "record", "name": "x", "fields": [{"name": "dt", 
> "type": {"logicalType": "timestamp-millis", "type": "long"}}, {"name": "v", 
> "type": "long"}], "__fastavro_parsed": true}
>
> $ avro-utils tojson nifi-data/out2.avro
> {"dt":1550538061000,"v":1}
> {"dt":-220894171,"v":2}
> {"dt":323687349000,"v":3}
>
> $ base64 nifi-data/out2.avro
> T2JqAQQUYXZyby5jb2RlYw5kZWZsYXRlFmF2cm8uc2NoZW1h5AJ7InR5cGUiOiAicmVjb3JkIiwg
> Im5hbWUiOiAieCIsICJmaWVsZHMiOiBbeyJuYW1lIjogImR0IiwgInR5cGUiOiB7ImxvZ2ljYWxU
> eXBlIjogInRpbWVzdGFtcC1taWxsaXMiLCAidHlwZSI6ICJsb25nIn19LCB7Im5hbWUiOiAidiIs
> ICJ0eXBlIjogImxvbmcifV0sICJfX2Zhc3RhdnJvX3BhcnNlZCI6IHRydWV9AHN0YWNraHV0bnN0
> YWNrMTUGPAEWAOn/kNOptKBaAt/q/PLJgAEEkOyn1OsSBpxyDHN0YWNraHV0bnN0YWNrMTU=
>
> And more context for the stacktrace for the error above is:
> nifi_1_63870dd343fd   | java.lang.RuntimeException: Cannot convert 
> 2019-02-19 01:01:01.0 to long
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.runtime.SqlFunctions.cannotConvert(SqlFunctions.java:1460)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1616)
> nifi_1_63870dd343fd   | at Baz$1$1.moveNext(Unknown Source)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.linq4j.Linq4j$EnumeratorIterator.(Linq4j.java:676)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.linq4j.Linq4j.enumeratorIterator(Linq4j.java:96)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.linq4j.AbstractEnumerable.iterator(AbstractEnumerable.java:33)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.avatica.MetaImpl.createCursor(MetaImpl.java:90)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.avatica.AvaticaResultSet.execute(AvaticaResultSet.java:184)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.jdbc.CalciteResultSet.execute(CalciteResultSet.java:64)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.jdbc.CalciteResultSet.execute(CalciteResultSet.java:43)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:573)
> nifi_1_63870dd343fd   | at 
> org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
> nifi_1_63870dd343fd   | at 
> org.apache.nifi.processors.standard.QueryRecord.query(QueryRecord.java:465)
> nifi_1_63870dd343fd   | at 
> org.apache.nifi.processors.standard.QueryRecord.onTrigger(QueryRecord.java:320)


Re: Different NiFi Node sizes within same cluster

2019-03-07 Thread Koji Kawamura
> The last thing I'm looking to understand is what Byran B brought up, do load 
> balanced connections take into consideration the load of each node?

No, load balanced connection doesn't use load of each node to
calculate destination currently.

As future improvement ideas.
We can implement another FlowFilePartitioner that uses QueuePartition.size().
Or add a nifi.property to specify the number of partitions each node
has. This may be helpful if the cluster consists of nodes having
different specs.

The rest is a note for some important lines of code to understand how
load balancing and partitioning works.

None of FlowFilePartitioner implementation takes into consideration
the load of each node.
- PARTITION_BY_ATTRIBUTE: Calculate hash from FlowFile attribute
value, then calculate target partition using consistent hashing. If
the attribute value doesn't distribute well, some node gets higher
number of FlowFiles.
- ROUND_ROBIN: We could implement another round robin strategy, that
uses QueuePartition.size() to pick a destination with less queued
FlowFile.
- SINGLE_NODE: Always uses the partitions[0]. Meaning the first node
in node identifier order.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java

For example, let's use 5 node cluster.

Partitions are created using sorted node identifiers.
The num of partitions = the num of nodes.
Each node will have 5 partitions. 1 LocalPartition, and 4 RemoteQueuePartition.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java#L140,L162

Each RemoteQueuePartition register itself to clientRegistry.
In this case, there are 4 clients for this loop.
Each node execute this task periodically.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java#L50,L76

Interestingly, the task is created for N times. N is configured at
nifi.cluster.load.balance.max.thread.count. 8 by default.
So, 8 threads loops through 4 clients?
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java#L652

Thanks,
Koji

On Thu, Mar 7, 2019 at 9:19 PM Chad Woodhead  wrote:
>
> Thanks all for the input. So from what I'm gathering, storage differences of 
> around 5 GB (125 GB vs 130 GB) should not cause any problems/load impacts. 
> Larger storage differences could have load impacts. Differences in CPU and 
> RAM could definitely have load impacts. Luckily my older nodes have the same 
> CPU and RAM counts/specs as my new nodes.
>
> The last thing I'm looking to understand is what Byran B brought up, do load 
> balanced connections take into consideration the load of each node?
>
> Thanks,
> Chad
>
> On Wed, Mar 6, 2019 at 4:50 PM Bryan Bende  wrote:
>>
>> Yea ListenTCP also doesn't handle the back-pressure with the client
>> the way it really should.
>>
>> Regarding the load balancing, I believe traditional s2s does factor in
>> the load of each node when deciding how to load balance, but I don't
>> know if this is part of load balanced connections or not. Mark P would
>> know for sure.
>>
>> On Wed, Mar 6, 2019 at 4:47 PM James Srinivasan
>>  wrote:
>> >
>> > Yup, but because of the unfortunate way the source (outside NiFi)
>> > works, it doesn't buffer for long when the connection doesn't pull or
>> > drops. It behaves far more like a 5 Mbps UDP stream really :-(
>> >
>> > On Wed, 6 Mar 2019 at 21:44, Bryan Bende  wrote:
>> > >
>> > > James, just curious, what was your source processor in this case? 
>> > > ListenTCP?
>> > >
>> > > On Wed, Mar 6, 2019 at 4:26 PM Jon Logan  wrote:
>> > > >
>> > > > What really would resolve some of these issues is backpressure on CPU 
>> > > > -- ie. let Nifi throttle itself down to not choke the machine until it 
>> > > > dies if constrained on CPU. Easier said than done unfortunately.
>> > > >
>> > > > On Wed, Mar 6, 2019 at 4:23 PM James Srinivasan 
>> > > >  wrote:
>> > > >>
>> > > >> In our case, backpressure applied all the way up to the TCP network
>> > > >> source which meant we lost data. AIUI, current load balancing is round
>> > > >> robin (and two other options prob not relevant). Would actual load
>> > > >> balancing (e.g. send to node with lowest OS load, or number of active
>> > > >> threads) be a reasonable request?
>> > > >>
>> > > >> On Wed, 6 Mar 2019 at 20:51, Joe Witt  wrote:
>> > > >> >
>> > > >> > This is generally workable (heterogenous node capabilities) in NiFi 
>> > > >> > clustering.  But you do want to leverage back-pressure and load 
>> > > >> 

Re: jolt transform spec ?

2019-03-06 Thread Koji Kawamura
Hello,

I haven't tested myself, but using EvaluateJsonPath and ReplaceText
with 'unescapeJson' EL function may be an alternative approach instead
of Jolt.
https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html#unescapejson

Idea is, use EvaluateJsonPath to extract the MSG part into a FlowFile
attribute, e.g. 'message'.
Then use ReplaceText with 'Replacement Value' as
"${message:unescapeJson()}" to update FlowFile content with the normal
JSON representation.

Thanks,
Koji

On Thu, Mar 7, 2019 at 5:18 AM l vic  wrote:
>
> I have json message that contains another json message in textual form:
> {
> "one": "one".
> "two":2,
> "MSG": "{\"src\":\"my source\",\"priority\":\"0\"}"
> }
>
> What would be transform spec to get "contained" message in json ?
> {
> "src": "my source",
> "priority": "0"
> }
>
> I've tried the following spec:
> [
>   {
> "operation": "shift",
> "spec": {
>   "MSG": {
> "*": "&"
>   }
> }
>   }
> ]
> but ended up with just text message:
> { "MSG": "{\"src\":\"my source\",\"priority\":\"0\"}"}
> How should i change it?
> Thank you
>


Re: Asymmetric push/pull throughput with S2S, possibly related to openConnectionForReceive compression?

2019-02-15 Thread Koji Kawamura
Hi Pat,

Thanks for sharing your insights.
I will try benchmarking before and after "gzip.setExcludedPath()" that
Mark has suggested if it helps improving S2S HTTP throughput.

Koji

On Fri, Feb 15, 2019 at 9:31 AM Pat White  wrote:
>
> Hi Andy,
>
> My requirement is to use https with minimum tls v1.2, https being an approved 
> protocol.
> I haven't looked at websockets though, i need to do that, thank you for the 
> suggestion.
>
> patw
>
>
>
> On Thu, Feb 14, 2019 at 12:24 PM Andy LoPresto  wrote:
>>
>> Pat,
>>
>> Just to clarify, your connection must be HTTPS or it just must be secure? 
>> What about Websockets over TLS (wss://)?
>>
>> Andy LoPresto
>> alopre...@apache.org
>> alopresto.apa...@gmail.com
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> On Feb 14, 2019, at 9:56 AM, Pat White  wrote:
>>
>> Thanks very much folks, definitely appreciate the feedback.
>>
>> Right, required to use tls/https connections for s2s, so raw is not an 
>> option for me.
>>
>> Will look further at JettyServer and setIncludedMethods, thanks again.
>>
>> patw
>>
>> On Thu, Feb 14, 2019 at 11:07 AM Mark Payne  wrote:
>>>
>>> Pat,
>>>
>>> It appears to be hard-coded, in JettyServer (full path is
>>> nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
>>>  )
>>>
>>> Line 294 calls the gzip method, which looks like:
>>>
>>> private Handler gzip(final Handler handler) {
>>> final GzipHandler gzip = new GzipHandler();
>>> gzip.setIncludedMethods("GET", "POST", "PUT", "DELETE");
>>> gzip.setHandler(handler);
>>> return gzip;
>>> }
>>>
>>>
>>> We probably would want to add a "gzip.setExcludedPath()" call to exclude 
>>> anything that goes to the site-to-site path.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> On Feb 14, 2019, at 11:46 AM, Joe Witt  wrote:
>>>
>>> ...interesting.  I dont have an answer but will initiate some research.  
>>> Hopefully someone else replies if they know off-hand.
>>>
>>> Thanks
>>>
>>> On Thu, Feb 14, 2019 at 11:43 AM Pat White  
>>> wrote:

 Hi Folks,

 Could someone point me at the correct way to modify Nifi's embedded jetty 
 configuration settings? Specifically i'd like to turn off jetty's 
 automatic compression of payload.

 Reason for asking, think i've found my performance issue, uncompressed 
 input to jetty is getting automatically compressed, by jetty, causing very 
 small and fragmented packets to be sent, which pegs the cpu receive 
 thread, recombining and uncompressing the incoming packets. I'd like to 
 verify by turning off auto compress.

 This is what i'm seeing, app layer compressed data (nifi output port 
 compression=on) is accepted by jetty as-is and sent over as large, 
 complete tcp packets, which the receiver is able to keep up with (do not 
 see rcv net buffers fill up). With app layer uncompressed data (nifi 
 output port compression=off), jetty automatically wants to compress and 
 sends payload as many small fragmented packets, this causes high cpu load 
 on the receiver and fills up the net buffers, causing a great deal of 
 throttling and backoff to the sender. This is consistent in wireshark 
 traces, good case shows no throttling, bad case shows constant throttling 
 with backoff.

 I've checked the User and Admin guides, as well as looking at JettyServer 
 and web/webdefault.xml for such controls but i'm clearly missing 
 something, changes have no effect on the server behavior. Appreciate any 
 help on how to set jetty configs properly, thank you.

 patw




 On Tue, Feb 5, 2019 at 9:07 AM Pat White  wrote:
>
> Hi Mark, thank you very much for the feedback, and the JettyServer 
> reference, will take a look at that code.
>
> I'll update the thread if i get any more info. Very strange issue, and 
> hard to see what's going on in the stream due to https encryption.
> Our usecase is fairly basic, get/put flows using https over s2s, i'd 
> expect folks would have hit this if it is indeed an issue, so i tend to 
> suspect my install or config, however the behavior is very consistent, 
> across multiple clean installs, with small files as well as larger files 
> (10s of MB vs GB sized files).
>
> Thanks again.
>
> patw
>
>
> On Mon, Feb 4, 2019 at 5:18 PM Mark Payne  wrote:
>>
>> Hey Pat,
>>
>> I saw this thread but have not yet had a chance to look into it. So 
>> thanks for following up!
>>
>> The embedded server is handled in the JettyServer class [1]. I can 
>> imagine that it may automatically turn on
>> GZIP. When pushing data, though, the client would be the one supplying 
>> the stream of data, so the client is not
>> GZIP'ing the data. But when requesting from Jetty, it may well be that 

Re: problem with merging of failed response

2019-01-14 Thread Koji Kawamura
Hello,

If you're using InvokeHttp processor to call REST endpoint, enabling
"Always Output Response" might be helpful to generate a FlowFile even
if the result HTTP status code is not successful.

Thanks,
Koji

On Fri, Jan 11, 2019 at 2:31 PM l vic  wrote:
>
> I have to merge results of "original", REST call and sql query  responses 
> into one flowfile/json
> The problem happens if REST call, or sql query result in empty resultset. For 
> example, suppose i have 3 incoming connections for Merge:
> original->flowfile1/attribute1, REST->flowfile2/attribute2, SQL 
> query->flowfile3/attribute3
> If REST call fails (error 404 - record not found), or sql call return empty 
> resultset - flowfiles never merged ( the origin gets stuck in input queue). 
> Can I found some merge strategy to solve this problem such as only origin 
> json would be let through? For example, is there something i can do so that 
> REST error would result in empty output flowfile?
> Thank you...


Re: Wait/Notify inconsistent behavior

2019-01-08 Thread Koji Kawamura
Hi Louis,

Glad to hear that works.

There are two approaches I can think of, to accumulate and combine
multiple service results.
The goal is creating a single FlowFile containing all service call
results, right?

1. Use DistributedCache(DC) as a shared heap space
- Each part of flow calls external services and store its result into DC
- Use cache keys in something like "OriginalRequestID_serviceType"
format. For example, "OriginalIncomingFlowFileUUID_serviceA"
- Use notify as well to let Wait know progress. Use serviceType as counter name
- After Wait finishes, use LookupAttribute or LookupRecord to enrich
the original FlowFile by fetching the accumulated results from DC
using DistributedMapCacheLookupService

2. Use MergeContent processor
- Route the original FlowFile into multiple sub-flows, meaning croning
the same FlowFile
- At each sub flow, call external service, update result FlowFile's
attribute so that MergeContent can wait and merge them:
-- fragment.identifier: some common id
-- fragment.count: the total number of sub-flows
-- fragment.index: to control merge order
- Route each resulting FlowFile into the same MergeContent processor,
use 'Defragment' Merge Strategy to merge FlowFiles based on
'fragment.*' attributes
-- Use Header, Footer and Demarcator intelligently to combine
contents, e.g. to form a valid JSON... etc

I'm not sure how do you want to loop, but here is another example flow
doing a traditional 'for' loop in NiFi.
https://gist.github.com/ijokarumawak/01c4fd2d9291d3e74ec424a581659ca8

Hope these can be helpful.

Thanks,
Koji

On Wed, Jan 9, 2019 at 1:04 AM Luis Carmona  wrote:
>
>
> Hi Koji,
>
> tryed with some manual tests and seems to be working now, doesn't have the 
> problema I had before. Today I will try it with a massvice flow and that will 
> be the final check.
>
> Thanks for your help. Do you know where I can get a sample about processing 
> in a loop, I mean, send things to a server, wait the answer on that, send to 
> a second server accumulating the answers of both, and all this in a finite 
> loop determined by the answers, Gathering all the answers in one final Json.
>
>
> Thank you very much.
>
> LC
>
>
>
> - Mensaje original -
> De: "Koji Kawamura" 
> Para: "users" 
> Enviados: Lunes, 7 de Enero 2019 22:22:12
> Asunto: Re: Wait/Notify inconsistent behavior
>
> Hi Luis,
>
> Look forward to know how it goes with a consolidated single Notify instance.
> Another benefit of doing so is by increasing 'Signal Buffer Count',
> the number of updates against the Distributed Cache Service can be
> decreased in your case, because multiple FlowFiles share the same
> signal id. Notify processor can merge counter deltas locally then
> update the cache entry only once.
>
> Thanks,
> Koji
>
> On Tue, Jan 8, 2019 at 3:50 AM Luis Carmona  wrote:
> >
> > Hi Koji,
> >
> > thanks for taking the time to answer my question
> >
> > In the Wait processor:
> > - Signal CounterName is empty (default).
> > - Target Signal Count is set to 2
> >
> > About the Notify processor, I used two of them thinking about that 
> > previously I set differently ${filename} in the precedings 
> > UpdateAttribute(s).
> >
> > I attached the image of both processors, and the template as well.
> >
> >
> > The whole point of my layout is to send things to a server, wait the answer 
> > on that, send to a second server accumulating the answers of both, and all 
> > this in a finite loop determined by the answers, Gathering all the answers 
> > in one final Json.
> >
> > By now Im stuck in the wait/notify issue, thanks for the sample I'll look 
> > into it. Then I will see how to get the loop.
> >
> > Thanks a lot,
> >
> > Regards,
> >
> > LC
> >
> >
> > - Mensaje original -
> > De: "Koji Kawamura" 
> > Para: "users" 
> > Enviados: Domingo, 6 de Enero 2019 23:42:56
> > Asunto: Re: Wait/Notify inconsistent behavior
> >
> > The reason tu put two Notify processors is that I'm using different
> >
> > Hi Luis,
> >
> > Just a quick question, how are the "Signal Counter Name" and "Target
> > Signal Count" properties for the Wait processor configured?
> > If you'd like to wait the two sub-flow branches to complete, then:
> > "Signal Counter Name" should be blank, meaning check total count for
> > all counter names
> > "Target Signal Count" should be 2.
> >
> > If those are configured like that, then would you be able to share
> > your flow as a template for further investigation?
> >
> &

Re: Wait/Notify inconsistent behavior

2019-01-07 Thread Koji Kawamura
Hi Luis,

Look forward to know how it goes with a consolidated single Notify instance.
Another benefit of doing so is by increasing 'Signal Buffer Count',
the number of updates against the Distributed Cache Service can be
decreased in your case, because multiple FlowFiles share the same
signal id. Notify processor can merge counter deltas locally then
update the cache entry only once.

Thanks,
Koji

On Tue, Jan 8, 2019 at 3:50 AM Luis Carmona  wrote:
>
> Hi Koji,
>
> thanks for taking the time to answer my question
>
> In the Wait processor:
> - Signal CounterName is empty (default).
> - Target Signal Count is set to 2
>
> About the Notify processor, I used two of them thinking about that previously 
> I set differently ${filename} in the precedings UpdateAttribute(s).
>
> I attached the image of both processors, and the template as well.
>
>
> The whole point of my layout is to send things to a server, wait the answer 
> on that, send to a second server accumulating the answers of both, and all 
> this in a finite loop determined by the answers, Gathering all the answers in 
> one final Json.
>
> By now Im stuck in the wait/notify issue, thanks for the sample I'll look 
> into it. Then I will see how to get the loop.
>
> Thanks a lot,
>
> Regards,
>
> LC
>
>
> - Mensaje original -
> De: "Koji Kawamura" 
> Para: "users" 
> Enviados: Domingo, 6 de Enero 2019 23:42:56
> Asunto: Re: Wait/Notify inconsistent behavior
>
> The reason tu put two Notify processors is that I'm using different
>
> Hi Luis,
>
> Just a quick question, how are the "Signal Counter Name" and "Target
> Signal Count" properties for the Wait processor configured?
> If you'd like to wait the two sub-flow branches to complete, then:
> "Signal Counter Name" should be blank, meaning check total count for
> all counter names
> "Target Signal Count" should be 2.
>
> If those are configured like that, then would you be able to share
> your flow as a template for further investigation?
>
> One more thing, although Notify processor cares about atomicity, due
> to the underlying distributed cache mechanism, concurrent writes to
> the same cache identifier can overwrite existing signal, meaning one
> of the two notifications can be lost.
> To avoid this, using the same Notify instance at 3a and 3b in your
> flow is highly recommended.
> Here is an example flow to do that:
> https://gist.github.com/ijokarumawak/6da4bd914236e941071cad103e1186dd
>
> Thanks,
> Koji
>
> On Sat, Jan 5, 2019 at 11:28 AM Joe Witt  wrote:
> >
> > thanks for letting us know.  the lists can be a bit awkward from a user 
> > experience pov.  no worries
> >
> > On Fri, Jan 4, 2019, 9:26 PM Luis Carmona  >>
> >> I'm sorry,
> >>
> >> got messages from nifi-users saying "UNCHECKED", and reading about 
> >> understood the message did not get trough.
> >>
> >> Thanks for letting me know.
> >>
> >> LC
> >>
> >> 
> >> De: "Joe Witt" 
> >> Para: "users" 
> >> Enviados: Viernes, 4 de Enero 2019 23:23:02
> >> Asunto: Re: Wait/Notify inconsistent behavior
> >>
> >> Please avoid sending more copies of the question.  Hopefully someone 
> >> familiar with the processors in question will be available in time.
> >>
> >>
> >> Thanks
> >>
> >>
> >> On Fri, Jan 4, 2019 at 9:14 PM Luis Carmona  
> >> wrote:
> >>>
> >>> Hi everyone,
> >>>
> >>> Im having a strange behavior with Wait / Notify mechanism. Attached is
> >>> the image of the flow.
> >>> Basically I'm trying to insert in Elastic search two record,
> >>> simultaneously, and if both went ok, then insert a record on a bpm 
> >>> service.
> >>>
> >>> For that (in the image):
> >>>
> >>> - Step 1: Set the attribute fragment.identifier to 1
> >>> - Step 2: Send the flow to Wait state, and,
> >>>   for 2a I set the attribute filename to 'caso' (without the
> >>> quotes) just before the POST to ElasticSearch
> >>>   for 2b I set the attribute filename to 'doc'  (without the
> >>> quotes) just before the other POST to ElasticSearch
> >>> - Step 3: On 3a, once the insert is finished, I'm expecting the notify
> >>> sends the signal to the wait state, and same thing for 3b.
> >>> - Step 4: HERE is my problem:
> >>>   Sometimes the WAIT has count.caso = 1, and count.doc=1 so
> >>> everything goes well on the RouteAttribute after step 5.
> >>>   Some other, it just receives one of the Nitifications, either
> >>> the 'doc' one, or the 'caso' one, so as the other doesn't come,
> >>>   the firts to arrive gets Queued. I 've checked and the two
> >>> Elastic insertions work almost inmediatly, so thatś not the problem.
> >>> - Setp 5: Is the expected path unless there was a real fail, but it is
> >>> not what is happening.
> >>>
> >>> Please any help, or tip would be very preciated.
> >>>
> >>> Regards,
> >>>
> >>> LC
> >>>
> >>


Re: Wait/Notify inconsistent behavior

2019-01-06 Thread Koji Kawamura
Hi Luis,

Just a quick question, how are the "Signal Counter Name" and "Target
Signal Count" properties for the Wait processor configured?
If you'd like to wait the two sub-flow branches to complete, then:
"Signal Counter Name" should be blank, meaning check total count for
all counter names
"Target Signal Count" should be 2.

If those are configured like that, then would you be able to share
your flow as a template for further investigation?

One more thing, although Notify processor cares about atomicity, due
to the underlying distributed cache mechanism, concurrent writes to
the same cache identifier can overwrite existing signal, meaning one
of the two notifications can be lost.
To avoid this, using the same Notify instance at 3a and 3b in your
flow is highly recommended.
Here is an example flow to do that:
https://gist.github.com/ijokarumawak/6da4bd914236e941071cad103e1186dd

Thanks,
Koji

On Sat, Jan 5, 2019 at 11:28 AM Joe Witt  wrote:
>
> thanks for letting us know.  the lists can be a bit awkward from a user 
> experience pov.  no worries
>
> On Fri, Jan 4, 2019, 9:26 PM Luis Carmona >
>> I'm sorry,
>>
>> got messages from nifi-users saying "UNCHECKED", and reading about 
>> understood the message did not get trough.
>>
>> Thanks for letting me know.
>>
>> LC
>>
>> 
>> De: "Joe Witt" 
>> Para: "users" 
>> Enviados: Viernes, 4 de Enero 2019 23:23:02
>> Asunto: Re: Wait/Notify inconsistent behavior
>>
>> Please avoid sending more copies of the question.  Hopefully someone 
>> familiar with the processors in question will be available in time.
>>
>>
>> Thanks
>>
>>
>> On Fri, Jan 4, 2019 at 9:14 PM Luis Carmona  wrote:
>>>
>>> Hi everyone,
>>>
>>> Im having a strange behavior with Wait / Notify mechanism. Attached is
>>> the image of the flow.
>>> Basically I'm trying to insert in Elastic search two record,
>>> simultaneously, and if both went ok, then insert a record on a bpm service.
>>>
>>> For that (in the image):
>>>
>>> - Step 1: Set the attribute fragment.identifier to 1
>>> - Step 2: Send the flow to Wait state, and,
>>>   for 2a I set the attribute filename to 'caso' (without the
>>> quotes) just before the POST to ElasticSearch
>>>   for 2b I set the attribute filename to 'doc'  (without the
>>> quotes) just before the other POST to ElasticSearch
>>> - Step 3: On 3a, once the insert is finished, I'm expecting the notify
>>> sends the signal to the wait state, and same thing for 3b.
>>> - Step 4: HERE is my problem:
>>>   Sometimes the WAIT has count.caso = 1, and count.doc=1 so
>>> everything goes well on the RouteAttribute after step 5.
>>>   Some other, it just receives one of the Nitifications, either
>>> the 'doc' one, or the 'caso' one, so as the other doesn't come,
>>>   the firts to arrive gets Queued. I 've checked and the two
>>> Elastic insertions work almost inmediatly, so thatś not the problem.
>>> - Setp 5: Is the expected path unless there was a real fail, but it is
>>> not what is happening.
>>>
>>> Please any help, or tip would be very preciated.
>>>
>>> Regards,
>>>
>>> LC
>>>
>>


Re: SimpleCsvFileLookupService with LookupAttribute

2018-12-17 Thread Koji Kawamura
Hi Ryan,

With following settings:

# LookupAttribute
(+dynamic) lookedUp=${someAttribute}

# SimpleCsvFileLookupService
CSV File=data.csv
Lookup Key Column=id
Lookup Value Column=value

# data.csv
id,value,desc
one,1,the first number
two,2,the 2nd number

If a FlowFile having 'someAttribute' attribute as 'two' is processed
by LookupAttribute
then the FlowFile will have additional 'lookedUp' attribute as '2'.

Unfortunately, LookupAttribute can only lookup a single value at a
time. You can only lookup 'value' or 'desc' with the above example,
not both.
In order to lookup multiple values, LookupRecord may be helpful. But
it enrich FlowFile content instead of attributes.

Hope this helps.

Thanks,
Koji
On Tue, Dec 18, 2018 at 3:54 AM Ryan Hendrickson
 wrote:
>
> Hi all,
>I'm trying to use the SimpleCsvFileLookupService with the LookupAttribute 
> processor.  I'm having some trouble understanding what the Controller Service 
> is doing with the Lookup Key/Value attributes and what the LookupAttribute is 
> doing with the Dynamic Files.
>
>Ultimately, I have a CSV file with an alphanumeric ID in it, I want to key 
> off-of that field, and return back the others in the row, all as nifi 
> attributes.
>
> Thanks,
> Ryan


Re: SQL Result to Attributes

2018-11-21 Thread Koji Kawamura
Hi Nick,

I thought there was a discussion about adding a general database
lookup service, but I can't find LookupService impl available that can
fetch value from external databases.
If there's such controller service, you can use LookupAttribute processor.
(wondering if you're interested in contributing that..)

As a workaround, you may be able to construct a flow like this:

- Pass the input FlowFile into two branches, 'main' and 'lookup'
- At 'lookup' branch, fetch data with ExecuteSQL, then extract the
required data from the result and put it to a cache using
PutDistributedMapCache
- At 'main' branch, use LookupAttribute with
DistributedMapCacheLookupService to lookup the value stored by
'lookup' branch
- 'main' branch may need some retry & timeout mechanism

Or if the dataset is relatively small, you can download and save it as
a CSV file periodically, then lookup using LookupAttribute with
SimpleCsvFileLookupService.

Thanks and Happy Thanksgiving!
Koji
On Thu, Nov 22, 2018 at 5:58 AM Nick Carenza
 wrote:
>
> I would like to add a sql query result to attributes but I don't see a way to 
> do this currently. Anyone know of a processor I am missing or have a 
> workaround?
>
> Preferably the result columns would become attributes.
>
> I think I cuuld use exctract test to move the current flowfile contents 
> into an attribute, runthe query with ExecuteSQL, convert to json from avro, 
> ExtractJsonPath the columns I want then use ReplaceText to bring the original 
> flowfile back... but that seems excessive and it requires putting the larger 
> of the 2 contents in an attribute.
>
> Thanks and Happy Thanksgiving Nifi-Users,
> Nick


Re: Syntax to access freeSpace from REST API

2018-11-20 Thread Koji Kawamura
Hi Jim,

The reason for 'contentRepositoryStorageUsage' being an array is that
you can configure multiple content repository paths.
In that case, each content repository can be mapped to different disk
partitions and have own free space.
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#file-system-content-repository-properties

If you have only one content repo dir, then you can use following syntax.
$.systemDiagnostics.aggregateSnapshot.contentRepositoryStorageUsage[0].freeSpace

Thanks,
Koji
On Wed, Nov 21, 2018 at 3:47 AM James McMahon  wrote:
>
> Hello. In the json that is returned by a RESTful API call to system 
> diagnostics, I can access the free space for the flowFile repo something like 
> this:
>
> status 
> $.systemDiagnostics.aggregateSnapshot.flowFileRepositoryStorageUsage.freeSpace
>
> But the content repo stats seem to be buried deeper in the structure, and are 
> also enclosed by a [,] that is not found in the flow file repo stats. Using a 
> syntax like that I've shown above does not return the value to me. What is 
> the syntax to use in an UpdateAttribute processor to grab that content repo 
> free space value, so that I can save it to an attribute?
>
> Here is the structure returned by the API call:
>
> {"systemDiagnostics":{"aggregateSnapshot":
>
>   {"totalNonHeap":"338.38 MB",
>
>"totalNonHeapBytes":354816000,
>
>"usedNonHeap":"326.16 MB",
>
>"usedNonHeapBytes":34264,
>
>"freeNonHeap":"12.22 MB",
>
>"freeNonHeapBytes":12815936,
>
>"maxNonHeap":"-1 bytes",
>
>"maxNonHeapBytes":-1,
>
>"totalHeap":"6 GB",
>
>"totalHeapBytes":6442450944,
>
>"usedHeap":"3.17 GB",
>
>"usedHeapBytes":3407422960,
>
>"freeHeap":"2.83 GB",
>
>"freeHeapBytes":3035027984,
>
>"maxHeap":"6 GB",
>
>"maxHeapBytes":6442450944,
>
>"heapUtilization":"53.0%",
>
>"availableProcessors":8,
>
>"processorLoadAverage":0.36,
>
>"totalThreads":99,
>
>"daemonThreads":53,
>
>"flowFileRepositoryStorageUsage":
>
>{"freeSpace":"44.39 GB",
>
> "totalSpace":"79.99 GB",
>
> "usedSpace":"35.59 GB",
>
> "freeSpaceBytes":47666999296,
>
> "totalSpaceBytes":85885063168,
>
> "usedSpaceBytes":38218063872,
>
> "utilization":"44.0%"},
>
> "contentRepositoryStorageUsage":[
>
>   {"identifier":"default","freeSpace":"44.39 GB",
>
>"totalSpace":"79.99 GB","usedSpace":"35.59 GB",
>
>"freeSpaceBytes":47666999296,
>
>"totalSpaceBytes":85885063168,
>
>"usedSpaceBytes":38218063872,
>
>"utilization":"44.0%"}],
>
>   "garbageCollection":[
>
> {"name":"G1 Young Generation",
>
>  "collectionCount":2172338,
>
>  "collectionTime":"132:38:54.540",
>
>  "collectionMillis":477534540},
>
> {"name":"G1 Old Generation","collectionCount":0,
>
>  "collectionTime":"00:00:00.000",
>
>  "collectionMillis":0}],
>
> "statsLastRefreshed":"03:05:01 UTC"}
>
>  }
>
> }
>
>
> Thanks in advance for your help. -Jim


Re: Multiple NiFi clusters with 1 NiFi Rigistry

2018-11-20 Thread Koji Kawamura
I agree with Bryan and Kevin. This is a good feature request.
Filed a JIRA for further discussion.
https://issues.apache.org/jira/browse/NIFIREG-212

In the meantime, as a work-around I'd deploy a reverse proxy (Nginx)
in front of NiFi Registry to only pass mutation requests
(POST/PUT/DELETE) from dev NiFi hosts.
https://serverfault.com/questions/152745/nginx-proxy-by-request-method

Thanks,
Koji
On Tue, Nov 20, 2018 at 11:33 PM Kevin Doran  wrote:
>
> I think Bryan’s correct that this makes a good feature request for Registry.
>
>
>
> One idea is if you are able to set separate policies for production NiFi and 
> non-production NiFi, then you could limit the user policies to read only for 
> the NiFi canvas / process group and only allow a service account or admin 
> have write access to that NiFi.
>
>
>
> NiFi registry would still technically be writable by those users, but it 
> would prevent them from doing something like making a change in the 
> production NiFi and then saving that version to Registry. It would be a safe 
> guard to make sure changes get introduced to the Registry copy from 
> non-production NiFis.
>
>
>
> Regards,
>
> Kevin
>
>
>
> From: Bryan Bende 
> Reply-To: "users@nifi.apache.org" 
> Date: Tuesday, November 20, 2018 at 08:28
> To: "users@nifi.apache.org" 
> Subject: Re: Multiple NiFi clusters with 1 NiFi Rigistry
>
>
>
> I think we would need to build some type of feature into registry to truly 
> support this. Possibly a more specific policy for proxies so that we could 
> say Dev NiFi can proxy read and write requests, and prod NiFi can only proxy 
> read requests. Currently it would only really work if you had separate 
> Kerberos domains, or weren’t using ranger.
>
>
>
> On Tue, Nov 20, 2018 at 6:25 AM Woodhead, Chad  wrote:
>
> Hi Koji,
>
> Unfortunately all of my NiFi clusters use the same Kerberos domain, which is 
> making this harder.
>
> Using NiFi identity mappings to map the same Kerberos principal to 
> environment aware ones seems like a good idea, but I’m thinking there will 
> then be a disconnect for Ranger (used for NiFi authorization) and NiFi 
> Registry. I say this because we use ldap user/group sync for Ranger and ldap 
> sync for NiFi Registry using ldap-user-group-provider.
>
> So in Ranger and NiFi Registry, the users are just listed as ‘bob’ and not 
> ‘bob@dev’. That means I would manually have to add users to Ranger and NiFi 
> Registry to add the ‘@dev’ part right? Or is there a way to customize that 
> too?
>
> Hope I’m not overcomplicating this!
>
> Thanks,
> Chad
>
> On 11/19/18, 8:26 PM, "Koji Kawamura"  wrote:
>
> *External Message* - Use caution before opening links or attachments
>
> Hi Chad,
>
> NiFi Registry uses NiFi user's identity to authorize request.
> Registry also checks NiFi instance's identity to authorize proxying
> user requests, but this can only authorize proxy capability. In order
> to control access such as bucket read/write, Registry uses NiFi user's
> identity.
>
> I assume the kerberos identities are different among those
> environments (Dev, Cert and Prod). And Registry will have different
> user identities for those.
> E.g. for user Bob, Registry would have bob@dev, bob@cert and bob@prod
> Then you can define dev, cert and prod groups at Registry to give
> certain access to buckets.
> E.g. give read/write access to dev group, and give only read access to
> cert and prod groups for a bucket.
>
> In case your NiFi clusters use the same Kerberos domain, you can use
> NiFi identity mapping to map the same Kerberos principal to
> environment aware ones, so that the above authorization can be
> configured at NiFi Registry.
> 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nifi.apache.org_docs_nifi-2Ddocs_html_administration-2Dguide.html-23identity-2Dmapping-2Dproperties=DwIFaQ=gJN2jf8AyP5Q6Np0yWY19w=MJ04HXP0mOz9-J4odYRNRx3ln4A_OnHTjJvmsZOEG64=lFWN8OtWhTL6eW3O-K1-lwIDlC0ZViDDrxxFSys2-Lw=_4v5XehCxEQZKSAr800935KuCtiECO-BQGkPknz5Gg4=
>
> Thanks,
> Koji
>
>
> On Tue, Nov 20, 2018 at 1:38 AM Woodhead, Chad  
> wrote:
> >
> > I am standing up 3 new HDF 3.2 clusters (Dev, Cert, and Prod) and we 
> will be focusing on NiFi (1.7.0) + NiFi Registry (0.2.0). We are using git as 
> our FlowPersistenceProvider. My plan is to use 1 NiFi Registry (the Prod NiFi 
> registry) for all 3 clusters, rather than having 3 NiFi Registries and trying 
> to keep the DB’s in sync between the 3 NiFi Registry instances.
> >
> >
> >
> > Is there a way to im

Re: Multiple NiFi clusters with 1 NiFi Rigistry

2018-11-19 Thread Koji Kawamura
Hi Chad,

NiFi Registry uses NiFi user's identity to authorize request.
Registry also checks NiFi instance's identity to authorize proxying
user requests, but this can only authorize proxy capability. In order
to control access such as bucket read/write, Registry uses NiFi user's
identity.

I assume the kerberos identities are different among those
environments (Dev, Cert and Prod). And Registry will have different
user identities for those.
E.g. for user Bob, Registry would have bob@dev, bob@cert and bob@prod
Then you can define dev, cert and prod groups at Registry to give
certain access to buckets.
E.g. give read/write access to dev group, and give only read access to
cert and prod groups for a bucket.

In case your NiFi clusters use the same Kerberos domain, you can use
NiFi identity mapping to map the same Kerberos principal to
environment aware ones, so that the above authorization can be
configured at NiFi Registry.
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#identity-mapping-properties

Thanks,
Koji


On Tue, Nov 20, 2018 at 1:38 AM Woodhead, Chad  wrote:
>
> I am standing up 3 new HDF 3.2 clusters (Dev, Cert, and Prod) and we will be 
> focusing on NiFi (1.7.0) + NiFi Registry (0.2.0). We are using git as our 
> FlowPersistenceProvider. My plan is to use 1 NiFi Registry (the Prod NiFi 
> registry) for all 3 clusters, rather than having 3 NiFi Registries and trying 
> to keep the DB’s in sync between the 3 NiFi Registry instances.
>
>
>
> Is there a way to implement some type of authorization so that users can only 
> PUSH/PULL changes from Dev NiFi to Prod NiFi Registry, and only PULL from 
> Cert and Prod NiFi from Prod NiFi Registry?
>
>
>
> NiFi and NiFi Registry both use the ‘kerberos-identity-provider’ for 
> authentication, and the Prod NiFi Registry authenticates with git via a ssh 
> access key.
>
>
>
> Thanks,
>
> Chad


Re: CaptureChangeMySQL - throwing Binlog connector communications failure

2018-11-19 Thread Koji Kawamura
Another possibility is that the command 'SHOW MASTER STATUS' and the
ROTATE event just return the correct position, but the binlog file
itself contain some issue.
I'd suggest checking MySQL master server side logs for any disk
related issue. In that case, some data loss is inevitable.

I tried different setups such as changing sync-binlog or rotating logs
forcefully via 'flush logs' statement, but haven't been able to
reproduce the error.
On Mon, Nov 19, 2018 at 3:17 PM Anand Dev  wrote:
>
> > Where/How did you get the initial binlog filename and position...
> I get it by querying "SHOW MASTER STATUS"
>
> I'm doubting that the ROTATE event is giving some invalid binlog position 
> because in the code I see for ROTATE case it is setting the binlog file and 
> position given in the ROTATE message.
>
> and just after ROTATE we get FORMAT_DESC but we don't do anything for that 
> event and right after that we get the ERROR saying could not find next log
>
> What are your views on this?
>
> On Mon, Nov 19, 2018 at 11:28 AM Koji Kawamura  wrote:
>>
>> > I face this scenario in every 2-3 days. And surprisingly, the binlog file 
>> > never changed still received ROTATE events, do you know any reason why 
>> > MySQL sends ROTATE and FOR_DES events even when there is no change in 
>> > binlog file?
>>
>> MySQL rotates binlog by expiration setting, mysql-log-rotate or when
>> logs are flushed such as taking backup ... etc
>> https://dev.mysql.com/doc/refman/8.0/en/log-file-maintenance.html
>>
>> Where/How did you get the initial binlog filename and position you set
>> at the CaptureChangeMySQL "Initial Binlog Filename" and position?
>>
>>
>> On Sat, Nov 17, 2018 at 12:50 AM Anand Dev  wrote:
>> >
>> > Yes, it fails with same message.
>> >
>> > And I noticed that it happened in the sequence, ROTATE event followed by 
>> > FORMAT_DESCRIPTION event followed by above ERROR
>> > When I reset with same binlog position, it again follows same order and 
>> > fails. I then have to reset with latest binlog pos but that costs data 
>> > loss.
>> >
>> > I have also seen instances when it has successfully processed these events 
>> > but not sure why it fails sometimes and everytime it fails after 
>> > FORMAT_DESCRIPTION.
>> >
>> > Error is same:
>> >  INFO [Process Cluster Protocol Request-3] 
>> > o.a.n.c.p.impl.SocketProtocolListener Finished processing request 
>> > b731e751-c486-4820-bffd-2eb543240457 (type=HEARTBEAT, length=2781 bytes) 
>> > from xx:5001 in 1 millis
>> >  INFO [blc-172.16.2.223:5101] c.g.shyiko.mysql.binlog.BinaryLogClient 
>> > Connected to :5324 at  (sid:65535, cid:239876)
>> >  INFO [Timer-Driven Process Thread-2] 
>> > o.a.n.c.m.processors.CaptureChangeMySQL 
>> > CaptureChangeMySQL[id=879f5c10-0166-1000--5772bb34] Got message 
>> > event type: ROTATE
>> >  INFO [Timer-Driven Process Thread-2] 
>> > o.a.n.c.m.processors.CaptureChangeMySQL 
>> > CaptureChangeMySQL[id=879f5c10-0166-1000--5772bb34] Got message 
>> > event type: FORMAT_DESCRIPTION
>> >  ERROR [Timer-Driven Process Thread-7] 
>> > o.a.n.c.m.processors.CaptureChangeMySQL 
>> > CaptureChangeMySQL[id=879f5c10-0166-1000--5772bb34] Binlog 
>> > connector communications failure: could not find next log; the first event 
>> > 'bin_5101.13' at 2343174, the last event read from 
>> > '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260, the last 
>> > byte read from '/u01/mysql/var/log/binlogs/bin_5101.13' at 2343260.: 
>> > com.github.shyiko.mysql.binlog.network.ServerException: could not find 
>> > next log; the first event 'bin_5101.13' at 2343174, the last event 
>> > read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260, 
>> > the last byte read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' 
>> > at 2343260.
>> > com.github.shyiko.mysql.binlog.network.ServerException: could not find 
>> > next log; the first event 'bin_5101.13' at 2343174, the last event 
>> > read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260, 
>> > the last byte read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' 
>> > at 2343260.
>> > at 
>> > com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)
>> >     at 
>> > com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
>> >  

Re: CaptureChangeMySQL - throwing Binlog connector communications failure

2018-11-18 Thread Koji Kawamura
> I face this scenario in every 2-3 days. And surprisingly, the binlog file 
> never changed still received ROTATE events, do you know any reason why MySQL 
> sends ROTATE and FOR_DES events even when there is no change in binlog file?

MySQL rotates binlog by expiration setting, mysql-log-rotate or when
logs are flushed such as taking backup ... etc
https://dev.mysql.com/doc/refman/8.0/en/log-file-maintenance.html

Where/How did you get the initial binlog filename and position you set
at the CaptureChangeMySQL "Initial Binlog Filename" and position?


On Sat, Nov 17, 2018 at 12:50 AM Anand Dev  wrote:
>
> Yes, it fails with same message.
>
> And I noticed that it happened in the sequence, ROTATE event followed by 
> FORMAT_DESCRIPTION event followed by above ERROR
> When I reset with same binlog position, it again follows same order and 
> fails. I then have to reset with latest binlog pos but that costs data loss.
>
> I have also seen instances when it has successfully processed these events 
> but not sure why it fails sometimes and everytime it fails after 
> FORMAT_DESCRIPTION.
>
> Error is same:
>  INFO [Process Cluster Protocol Request-3] 
> o.a.n.c.p.impl.SocketProtocolListener Finished processing request 
> b731e751-c486-4820-bffd-2eb543240457 (type=HEARTBEAT, length=2781 bytes) from 
> xx:5001 in 1 millis
>  INFO [blc-172.16.2.223:5101] c.g.shyiko.mysql.binlog.BinaryLogClient 
> Connected to :5324 at  (sid:65535, cid:239876)
>  INFO [Timer-Driven Process Thread-2] o.a.n.c.m.processors.CaptureChangeMySQL 
> CaptureChangeMySQL[id=879f5c10-0166-1000--5772bb34] Got message event 
> type: ROTATE
>  INFO [Timer-Driven Process Thread-2] o.a.n.c.m.processors.CaptureChangeMySQL 
> CaptureChangeMySQL[id=879f5c10-0166-1000--5772bb34] Got message event 
> type: FORMAT_DESCRIPTION
>  ERROR [Timer-Driven Process Thread-7] 
> o.a.n.c.m.processors.CaptureChangeMySQL 
> CaptureChangeMySQL[id=879f5c10-0166-1000--5772bb34] Binlog connector 
> communications failure: could not find next log; the first event 
> 'bin_5101.13' at 2343174, the last event read from 
> '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260, the last byte 
> read from '/u01/mysql/var/log/binlogs/bin_5101.13' at 2343260.: 
> com.github.shyiko.mysql.binlog.network.ServerException: could not find next 
> log; the first event 'bin_5101.13' at 2343174, the last event read from 
> '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260, the last byte 
> read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260.
> com.github.shyiko.mysql.binlog.network.ServerException: could not find next 
> log; the first event 'bin_5101.13' at 2343174, the last event read from 
> '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260, the last byte 
> read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 2343260.
> at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)
> at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
> at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I face this scenario in every 2-3 days. And surprisingly, the binlog file 
> never changed still received ROTATE events, do you know any reason why MySQL 
> sends ROTATE and FOR_DES events even when there is no change in binlog file?
>
> P.S. using NiFi 1.4
>
> On Thu, Nov 15, 2018 at 11:02 AM Koji Kawamura  wrote:
>>
>> > Sometimes it fails (I ensure that the state is cleared), and then I have 
>> > to restart it with latest binlog position.
>>
>> Does it fail with the same error message?
>> com.github.shyiko.mysql.binlog.network.ServerException: could not find
>> next log; the first event 'bin_8101.01' at 142923964, the last
>> event read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at
>> 142924061, the last byte read from
>> '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061.
>>
>> Excuse me if I'm not understanding it correctly, but you mean when you
>> clear the processor state and manually put "Initial Binlog Filename"
>> and "Initial Binlog Position" the processor fails to start with the
>> above error message? If so, where did you get the initial binlog
>> filename and position?
>>
>> On Wed, Nov 14, 2018 at 8:50 PM Anand Dev  wrote:
>> >
>> > When I restart the Processor from the binlog pos where it stopped, not 
>> > everytime it starts successfully.
>> >
>> > Sometimes it fails (I ensure that the state is cle

Re: NiFi 1.7.1 remote group not connecting when added through restful api until nifi restarted

2018-11-15 Thread Koji Kawamura
Hi William,

> I do see the error message when I do a restart and there are existing remote 
> process groups but they do seem to connect eventually

This is a known behavior for RPGs connecting to the same NiFi. During
start up process, RemoteProcessGroups try to get remote info before
NiFi API gets ready.
They connect eventually. But again, you don't have to do restart to
make RPGs work after you instantiated it by REST API.

> It's only the initial create of  the first remote process group that seems to 
> be acting weird.

Please elaborate how weird it behaves. If the added RPG doesn't
recognize the existing ports, then I'd add some wait logic as Chris
suggested.

Thanks,
Koji

On Fri, Nov 16, 2018 at 2:03 AM William Gosse
 wrote:
>
> I don't believe I see any error messages on the initial remote process group 
> that gets created.
>
> I do see the error message when I do a restart and there are existing remote 
> process groups but they do seem to connect eventually
>
> It's only the initial create of  the first remote process group that seems to 
> be acting weird.
>
> -----Original Message-
> From: Koji Kawamura 
> Sent: Thursday, November 15, 2018 1:05 AM
> To: users@nifi.apache.org
> Subject: Re: NiFi 1.7.1 remote group not connecting when added through 
> restful api until nifi restarted
>
> [CAUTION: This email originated from outside of Kodak Alaris. Do not click 
> links or open attachments unless you recognize the sender and know the 
> content is safe.] 
>
> Hello William,
>
> > fails to connect to the existing input port until I do a restart of
> > NiFi
> Is there any error message when it fails? Connection refused?
> It should not require a NiFi restart to establish connection.
>
> Thanks,
> Koji
>
>
>
> On Thu, Nov 15, 2018 at 1:38 AM William Gosse  
> wrote:
> >
> > I'm using NiFi's restful api to create a process group from a template
> > that contains a file folder listener and a remote group. I also have
> > an existing input port going to a process group that has the file
> > fetch and everything else for processing the file that was fetched.
> > This is not added by a restful api but is loaded from a template after
> > the initial startup of Nifi. What I wind up with is a list fetch
> > pattern that should cluster ready/
> >
> > My problem is that the first time I use the rest api to add the listener 
> > process group the remote group that's inside of it fails to connect to the 
> > existing input port until I do a restart of NiFi. At that point all 
> > subsequent listeners I add seem to connect just fine.
> >
> > Is there way that I can cause the initial connection to occur without 
> > having to perform a restart?
> >
> >


Re: Edit QueryCassandra processor using REST API.

2018-11-14 Thread Koji Kawamura
Hi Dnyaneshwar,

You can terminate remaining thread forcefully by sending a DELETE
request to /processors/{id}/threads.
https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

Thanks,
Koji
On Thu, Nov 15, 2018 at 4:14 PM Dnyaneshwar Pawar
 wrote:
>
> Hi
>
>   We are trying to edit the QueryCassandra processor using REST API, this 
> needs processor to be stopped programmatically and start post changes done. 
> In this processes, initially we were getting issue of processor taking time 
> to stop. We added wait time of 10 sec and that worked. However, the recent 
> observation is, sometime processor stuck in STOPPING state and never goes 
> down in 10 sec time.
>
>   This does not allow start of the processor and that processor remains in 
> the stopped/stopping state. PFB Error log:
>
> 6a4bf199-4071-3823-04c1-fa4fe9fba79b cannot be started because it is not 
> stopped. Current state is STOPPING
>
>
>
> We know we can get the processor state by REST URL, however, sometime this 
> processor stuck in the STOPPING state.  Is there any way to force stop this 
> processor or any max time limit at which processor will go down?
>
>
>
>
>
>
>
>
>
>
>
> Regards,
>
> Dnyaneshwar Pawar
>
>


Re: NiFi 1.7.1 remote group not connecting when added through restful api until nifi restarted

2018-11-14 Thread Koji Kawamura
Hello William,

> fails to connect to the existing input port until I do a restart of NiFi
Is there any error message when it fails? Connection refused?
It should not require a NiFi restart to establish connection.

Thanks,
Koji



On Thu, Nov 15, 2018 at 1:38 AM William Gosse
 wrote:
>
> I'm using NiFi's restful api to create a process group from a template that 
> contains a file folder listener and a remote group. I also have an existing 
> input port going to a process group that has the file fetch and everything 
> else for processing the file that was fetched. This is not added by a restful 
> api but is loaded from a template after the initial startup of Nifi. What I 
> wind up with is a list fetch pattern that should cluster ready/
>
> My problem is that the first time I use the rest api to add the listener 
> process group the remote group that's inside of it fails to connect to the 
> existing input port until I do a restart of NiFi. At that point all 
> subsequent listeners I add seem to connect just fine.
>
> Is there way that I can cause the initial connection to occur without having 
> to perform a restart?
>
>


Re: CaptureChangeMySQL - throwing Binlog connector communications failure

2018-11-14 Thread Koji Kawamura
> Sometimes it fails (I ensure that the state is cleared), and then I have to 
> restart it with latest binlog position.

Does it fail with the same error message?
com.github.shyiko.mysql.binlog.network.ServerException: could not find
next log; the first event 'bin_8101.01' at 142923964, the last
event read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at
142924061, the last byte read from
'/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061.

Excuse me if I'm not understanding it correctly, but you mean when you
clear the processor state and manually put "Initial Binlog Filename"
and "Initial Binlog Position" the processor fails to start with the
above error message? If so, where did you get the initial binlog
filename and position?

On Wed, Nov 14, 2018 at 8:50 PM Anand Dev  wrote:
>
> When I restart the Processor from the binlog pos where it stopped, not 
> everytime it starts successfully.
>
> Sometimes it fails (I ensure that the state is cleared), and then I have to 
> restart it with latest binlog position.
>
> Is there any thing apart from state to be taken care while restarting the 
> processor from any binlog position?
>
>
> On Tue, Nov 13, 2018 at 7:27 AM Koji Kawamura  wrote:
>>
>> Hi Anand,
>>
>> I'm not sure what caused the error, but I believe the error is MySQL error 
>> 1236.
>> The error can happen with different reasons, you may find this article
>> informative.
>> https://www.percona.com/blog/2014/10/08/mysql-replication-got-fatal-error-1236-causes-and-cures/
>>
>> Also, there is a JIRA that may be related to the issue.
>> "Make Change Data Capture (CDC) processor for MySQL refer to GTID"
>> https://issues.apache.org/jira/browse/NIFI-5739
>>
>> Did your MySQL server cluster undergo master change recently?
>>
>> If the error keeps happening and you can't get events from the binlog
>> any more, I'd stop the CaptureChangeMySQL processor and clear its
>> state, then restart capturing to recover from that situation. If your
>> use-case allows that..
>> This will do the same thing with the one described in the preceding
>> percona article.
>> > The solution would be to move the slave thread to the next available 
>> > binary log and initialize slave thread with the first available position 
>> > on binary log as below:
>>
>> Thanks,
>> Koji
>> On Mon, Nov 12, 2018 at 10:28 PM Anand Chandrashekhar Tirthgirikar
>>  wrote:
>> >
>> > Thanks Koji for the resolution of previous issue.
>> >
>> >
>> >
>> > But I see below error again and again once in a day. There is nothing else 
>> > in the logs apart from below error.
>> >
>> >
>> >
>> > Can you please tell what could be the problem here? Is it from server side 
>> > or from processor or any network issue? We are not able to reproduce it.
>> >
>> >
>> >
>> >
>> >
>> > 2018-11-12 11:46:31,952 ERROR [Timer-Driven Process Thread-9] 
>> > o.a.n.c.m.processors.CaptureChangeMySQL 
>> > CaptureChangeMySQL[id=df913697-f589-121f-b681-da192917092a] Binlog 
>> > connector communications failure: could not find next log; the first event 
>> > 'bin_8101.01' at 142923964, the last event read from 
>> > '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061, the last 
>> > byte read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 
>> > 142924061.: com.github.shyiko.mysql.binlog.network.ServerException: could 
>> > not find next log; the first event 'bin_8101.01' at 142923964, the 
>> > last event read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 
>> > 142924061, the last byte read from 
>> > '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061.
>> >
>> > com.github.shyiko.mysql.binlog.network.ServerException: could not find 
>> > next log; the first event 'bin_8101.01' at 142923964, the last event 
>> > read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061, 
>> > the last byte read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' 
>> > at 142924061.
>> >
>> > at 
>> > com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)
>> >
>> > at 
>> > com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
>> >
>> > at 
>> > com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
>> >
>> > at java

Re: Parsing a template to identify processor names

2018-11-13 Thread Koji Kawamura
Hello,

I'm not sure if this is what you're looking for, but I wrote a test
case before that loads template and use flow information to automate
tests.
The code can be a reference.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java#L100

Thanks,
Koji
On Wed, Nov 14, 2018 at 7:23 AM Vitaly Krivoy
 wrote:
>
> If a process group template contains multiple processors of the same type, I 
> need to make sure that they have different names. Is there an easier way of 
> parsing a template outside of instantiating it and then getting all 
> processors from a resulting process group? I do already have the code to do 
> the latter, but I am curious if there is another way. Thanks.
>
>
>
> STATEMENT OF CONFIDENTIALITY The information contained in this email message 
> and any attachments may be confidential and legally privileged and is 
> intended for the use of the addressee(s) only. If you are not an intended 
> recipient, please: (1) notify me immediately by replying to this message; (2) 
> do not use, disseminate, distribute or reproduce any part of the message or 
> any attachment; and (3) destroy all copies of this message and any 
> attachments.


Re: CaptureChangeMySQL - throwing Binlog connector communications failure

2018-11-12 Thread Koji Kawamura
Hi Anand,

I'm not sure what caused the error, but I believe the error is MySQL error 1236.
The error can happen with different reasons, you may find this article
informative.
https://www.percona.com/blog/2014/10/08/mysql-replication-got-fatal-error-1236-causes-and-cures/

Also, there is a JIRA that may be related to the issue.
"Make Change Data Capture (CDC) processor for MySQL refer to GTID"
https://issues.apache.org/jira/browse/NIFI-5739

Did your MySQL server cluster undergo master change recently?

If the error keeps happening and you can't get events from the binlog
any more, I'd stop the CaptureChangeMySQL processor and clear its
state, then restart capturing to recover from that situation. If your
use-case allows that..
This will do the same thing with the one described in the preceding
percona article.
> The solution would be to move the slave thread to the next available binary 
> log and initialize slave thread with the first available position on binary 
> log as below:

Thanks,
Koji
On Mon, Nov 12, 2018 at 10:28 PM Anand Chandrashekhar Tirthgirikar
 wrote:
>
> Thanks Koji for the resolution of previous issue.
>
>
>
> But I see below error again and again once in a day. There is nothing else in 
> the logs apart from below error.
>
>
>
> Can you please tell what could be the problem here? Is it from server side or 
> from processor or any network issue? We are not able to reproduce it.
>
>
>
>
>
> 2018-11-12 11:46:31,952 ERROR [Timer-Driven Process Thread-9] 
> o.a.n.c.m.processors.CaptureChangeMySQL 
> CaptureChangeMySQL[id=df913697-f589-121f-b681-da192917092a] Binlog connector 
> communications failure: could not find next log; the first event 
> 'bin_8101.01' at 142923964, the last event read from 
> '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061, the last byte 
> read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061.: 
> com.github.shyiko.mysql.binlog.network.ServerException: could not find next 
> log; the first event 'bin_8101.01' at 142923964, the last event read from 
> '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061, the last byte 
> read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061.
>
> com.github.shyiko.mysql.binlog.network.ServerException: could not find next 
> log; the first event 'bin_8101.01' at 142923964, the last event read from 
> '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061, the last byte 
> read from '/u01/mysql/8101/var/log/binlogs/bin_8101.01' at 142924061.
>
> at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)
>
> at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
>
> at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> From: Koji Kawamura 
> Sent: 23 October 2018 12:02
> To: users@nifi.apache.org
> Subject: Re: CaptureChangeMySQL - throwing Binlog connector communications 
> failure
>
>
>
> Hello,
>
> Thanks for reporting the issue and the detailed analysis.
> I was able to reproduce by setting short wait_timeout as 30 sec.
>
> I'm not aware of any work-around to make keep alive CaptureChangeMySQL
> at the moment.
> But you can write a script to stop/start CaptureChangeMySQL processor
> using NiFi API to refresh underlying JDBC connection.
> NipyAPI can be useful to write such scripts.
> https://github.com/Chaffelson/nipyapi
>
> I submit a JIRA for this issue.
> https://issues.apache.org/jira/browse/NIFI-5739
>
> Thanks,
> Koji
>
>
> On Wed, Oct 10, 2018 at 9:42 PM Anand Dev  wrote:
> >
> > In logs, below error is reported
> >
> > 2018-10-10 00:16:19,766 ERROR [Timer-Driven Process Thread-1] 
> > o.a.n.c.m.processors.CaptureChangeMySQL 
> > CaptureChangeMySQL[id=3c3f6e89-8401-3a67-a49d-3af14abf13dc] Error in 
> > getting DDL Table info create table  : 
> > com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No 
> > operations allowed after connection closed.
> >
> > Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The 
> > last packet successfully received from the server was 58,006,344 
> > milliseconds ago. The last packet sent successfully to the server was 
> > 58,006,345 milliseconds ago. is longer than the server configured value of 
> > 'wait_timeout'. You should consider either expiring and/or testing 
> > connection validity before use in your application, increasing the server 
> > configured values for client timeouts, or using the Connector/J connection 
> >

Re: Problem of connection in Remote Process Group

2018-11-08 Thread Koji Kawamura
Hi Jean,

If you haven't, please take a look on this documentation. There are few
example configurations and deployment diagrams you can refer.
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#site_to_site_reverse_proxy_properties

Also, here are some Nginx configurations that I used to develop and test
the routing feature.
https://github.com/ijokarumawak/nifi-reverseproxy/tree/master/nginx

The S2S routing capability is introduced at NiFi 1.7.0.

Hope this helps.

Thanks,
Koji

On Fri, Nov 9, 2018 at 6:38 AM GASCHLER, Jean 
wrote:

> Hi
>
>
>
> I have some difficulties to put in place a working infrastructure with at
> least two secured NIFI(s) : the first one calling a remoteProcessGroup
> linked with the second one which is behind a Nginx reverse-proxy.
>
> I am looking for someone who has experience with such configuration
> because the NIFI documentation is not clear about this.
>
>
>
> What should I change in the NIFI configuration of the site3 if I want this
> other infrastructure?
>
>
>
> Thanks a lot
>
>
>
> *--Jean Gaschler*
> This message contains information that may be privileged or confidential
> and is the property of the Capgemini Group. It is intended only for the
> person to whom it is addressed. If you are not the intended recipient, you
> are not authorized to read, print, retain, copy, disseminate, distribute,
> or use this message or any part thereof. If you receive this message in
> error, please notify the sender immediately and delete all copies of this
> message.
>


Re: Nulls in input data throwing exceptions when using QueryRecord

2018-11-08 Thread Koji Kawamura
Hi Mandeep,

Thanks for reporting the issue and detailed explanation. That's very helpful!
I was able to reproduce the issue and found a possible solution.
Filed a JIRA, a PR will be submitted shortly to fix it.
https://issues.apache.org/jira/browse/NIFI-5802

Thanks,
Koji
On Wed, Nov 7, 2018 at 8:54 PM Mandeep Gill  wrote:
>
> Hi,
>
> We're hitting a couple of issues working with nulls when using QueryRecord 
> using both NiFi 1.7.1 and 1.8.0.
>
> Things work as expected for strings, however when using other primitive types 
> as defined by the avro schema, such as boolean, long, and double, null values 
> in the input data aren't converted to NULLs within the SQL engine / Calcite. 
> Instead they appear to remain as java null values and throw NPEs when 
> attempting to use them within a query or simply return them as the output.
>
> To give some examples, given the following record data and schema (tested 
> using both JSON and Avro record reader/writers)
>
> [ {  "str_test" : "hello1",  "bool_test" : true }, {  "str_test" : null,  
> "bool_test" : null } ]
>
> {
>   "type": "record",
>   "name": "schema",
>   "fields": [
> {
>   "name": "str_test",
>   "type": [ "string", "null" ],
>   "default": null
> },
> {
>   "name": "bool_test",
>   "type": [ "boolean", "null" ],
>   "default": null
> }
>   ]
> }
>
> The following queries return the empty resultset,
>
> select 'res' as res from FLOWFILE where bool_test IS NULL
> select 'res' as res from FLOWFILE where bool_test IS UNKNOWN
>
> and the query below returns a resultset of count 2,
>
> select 'res' from FLOWFILE where bool_test IS NOT NULL
>
> The query below works as expected, suggesting things work fine for strings
>
> select 'res' as res from FLOWFILE where str_test IS NULL
>
> However, finally the following query throws a NullPointerException (see [1]) 
> on trying to convert the null to a boolean within the output writer
>
> select * from FLOWFILE where bool_test IS NOT NULL
>
> The null values for these types seem to be treated as distinct to the NULLs 
> within the SQL engine, as the following query returns the empty resultset.
>
> select 'res' as res from FLOWFILE where CAST(NULL as boolean) IS DISTINCT 
> FROM bool_test
>
> and the following query gives an RuntimeException (see [2]),
>
> select (COALESCE(bool_test, TRUE)) as res from flowfile
>
> Given all this we're unable to make use of datasets with nulls, are nulls 
> only supported for strings or is there perhaps something we're doing wrong 
> here in our setup/config. One thing we've noticed when running a simple 
> "SELECT * from FLOWFILE" returns a nullable type for strings in the output 
> avro schema but not for other primitives, even if they were nullable in the 
> input schema - which could be related.
>
> Cheers,
> Mandeep
>
> [1] org.apache.nifi.processor.exception.ProcessException: IOException thrown 
> from QueryRecord[id=43ee29ff-0166-1000-28bd-06dd07c1425d]: 
> java.io.IOException: 
> org.apache.avro.file.DataFileWriter$AppendWriteException: 
> java.lang.NullPointerException: null of boolean in field bool_test of 
> org.apache.nifi.nifiRecord
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2667)
> at 
> org.apache.nifi.processors.standard.QueryRecord.onTrigger(QueryRecord.java:309)
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
> at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
> at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: 
> org.apache.avro.file.DataFileWriter$AppendWriteException: 
> java.lang.NullPointerException: null of boolean in field bool_test of 
> org.apache.nifi.nifiRecord
> at 
> org.apache.nifi.processors.standard.QueryRecord$1.process(QueryRecord.java:327)
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2648)
> ... 12 common frames omitted
> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: 
> java.lang.NullPointerException: null of boolean in field bool_test of 
> org.apache.nifi.nifiRecord
> at 

Re: Can I define variables using other variables (or expression language)?

2018-11-06 Thread Koji Kawamura
Hi Krzysztof,

Currently, NiFi variables cannot refer other variables. But you may be
able to achieve the expected result if you combine multiple variables
within a single Expression Language.
For example, if the "scratch_path" is the one you need to use at
ListFile processor's "Input directory", then you can set an EL there:
- Variables
-- env=staging
-- scratch_path=scratch
- ListFile Input directory = /${env}/${scratch_path}

Having said that, I agree with you, referring other variable
capability would be nice to have.
Please feel free to file a JIRA.
https://issues.apache.org/jira/projects/NIFI

Thanks,
Koji
On Tue, Nov 6, 2018 at 8:01 PM Krzysztof Zarzycki  wrote:
>
> Hi community,
> I'm using Nifi Variable Registry (through UI). I would like to define one 
> variable using value from other variable, like:
> env = staging
> scratch_path=/${env}/scratch
>
> I tried exactly that approach, but it didn't work, the variable scratch_path 
> gets value literally /${env}/scratch.
>
> Do you know if it should or should not be possible? Maybe it's a good idea 
> for a feature request?
>
> Thanks,
> Krzysztof


Re: Available SQL fn in QueryRecord??

2018-10-28 Thread Koji Kawamura
Hi Dano,

Since QueryRecord uses Apache Calcite, it should support SQL functions
supported by Calcite.
Please check Calcite documentation.
https://calcite.apache.org/docs/reference.html#arithmetic-operators-and-functions

Thanks,
Koji
On Sat, Oct 27, 2018 at 5:45 AM dan young  wrote:
>
> Hello,
>
> Is there a list or somewhere we can find all the available SQL functions 
> available in QueryRecord?
>
> Regards
>
> Dano


Re: how to merge attributes?

2018-10-23 Thread Koji Kawamura
Hello,

InvokeHttp creates a new FlowFile for "Result" relationship from the
incoming FlowFile. That means, the FlowFile for "Result" carries all
attributes copied that the incoming one has. You just need to connect
the "Result" relationship to ReplaceTest. "Original" can be
auto-terminated in this case.

Thanks,
Koji
On Wed, Oct 24, 2018 at 5:37 AM l vic  wrote:
>
> I have to create sql query from results of rest call (InvokeHttp ) and 
> original request parameters...
>
> Any idea how i can merge attributes from both in single flowfile for passing 
> it to "ReplaceText"? If I pass both "Original" and "Result" from "InvokeHttp" 
> to "ReplaceText" I end up with one "valid" flowfile using attributes from 
> both and one "junk" one with empty placeholders... If I try use 
> "MergeContent" to  merge flows I end up only with junk only like the following
>
>
> update joint_table set create_date = 1540321085127, filter = '', 
> data_start_time =  + 1532016495 - , execute_date = 1532016495 + where id = 
> '1749462c-ed2b-4a34-9332-3687a60c1e1c'
>
>
> Thank you


Re: CaptureChangeMySQL - throwing Binlog connector communications failure

2018-10-23 Thread Koji Kawamura
Hello,

Thanks for reporting the issue and the detailed analysis.
I was able to reproduce by setting short wait_timeout as 30 sec.

I'm not aware of any work-around to make keep alive CaptureChangeMySQL
at the moment.
But you can write a script to stop/start CaptureChangeMySQL processor
using NiFi API to refresh underlying JDBC connection.
NipyAPI can be useful to write such scripts.
https://github.com/Chaffelson/nipyapi

I submit a JIRA for this issue.
https://issues.apache.org/jira/browse/NIFI-5739

Thanks,
Koji


On Wed, Oct 10, 2018 at 9:42 PM Anand Dev  wrote:
>
> In logs, below error is reported
>
> 2018-10-10 00:16:19,766 ERROR [Timer-Driven Process Thread-1] 
> o.a.n.c.m.processors.CaptureChangeMySQL 
> CaptureChangeMySQL[id=3c3f6e89-8401-3a67-a49d-3af14abf13dc] Error in getting 
> DDL Table info create table  : 
> com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No 
> operations allowed after connection closed.
>
> Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last 
> packet successfully received from the server was 58,006,344 milliseconds ago. 
>  The last packet sent successfully to the server was 58,006,345 milliseconds 
> ago. is longer than the server configured value of 'wait_timeout'. You should 
> consider either expiring and/or testing connection validity before use in 
> your application, increasing the server configured values for client 
> timeouts, or using the Connector/J connection property 'autoReconnect=true' 
> to avoid this problem.
>
> It happened because the connection got stale / dead after 8 hours.
>
> How to keep alive CaptureChangeMySQL processor connection with DB ?
> One way would be increasing 'wait_timeout' to high value but the error can 
> still occur post that time.
> How to keep alive connection forever unless processor is stopped?
>
> On Wed, Oct 10, 2018 at 4:41 PM Anand Dev  wrote:
>>
>> When I start the CaptureChangeMySQL processor, it worked fine till the point 
>> when DDL event occured for Creating a table.
>>
>> Processor captured the CREATE table event successfully.
>> Next, when record was inserted in the newly created table, it threw below 
>> error and stopped capturing further events.
>>
>> ERROR
>> ===
>> Binlog connector communications failure: could not find next log; the first 
>> event 'bin_5101.13' at 921485, the last event read from 
>> '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 921565, the last byte 
>> read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 921565
>>
>> com.github.shyiko.mysql.binlog.network.ServerException: could not find next 
>> log; the first event 'bin_5101.13' at 921485, the last event read from 
>> '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 921565, the last byte 
>> read from '/u01/mysql/5101/var/log/binlogs/bin_5101.13' at 921565
>> 
>>
>> Can someone suggest what's going wrong here?


Re: Issue with PutSQL

2018-10-22 Thread Koji Kawamura
Hi Vyshali,

I was able to connect MySQL Server 8.0.12 with
mysql-connector-java-8.0.12.jar successfully, using username/password.

Here are the SQLs I used to create a MySQL user:
CREATE USER 'nifi'@'%' IDENTIFIED BY 'password';
GRANT ALL PRIVILEGES ON * . * TO 'nifi'@'%';

Does your connection url have any configuration? Which NiFi version
are you using?

Thanks,
Koji


On Mon, Oct 22, 2018 at 1:11 PM N, Vyshali  wrote:
>
> Hi,
>
>
>
> I'm using MySQL server(8.0.12) for my development and PutSQL processor in 
> Nifi for inserting records in DB. I have downloaded 
> mysql-connector-java-8.0.12.jar and pointed it in the DBCPConnectionPool 
> setting but I'm getting an error,
>
>
>
>  "Cannot create PoolableConnectionFactory( client doesn't support 
> authentication protocol supported by user, consider upgrading MySQL client) "
>
>
>
> I guess there is no need to upgrade the client as I have downloaded the 
> suitable version of jar file. Also, I have given full permission for the 
> folder containing the jar file. Please let me know where I have gone wrong.
>
>
>
> Thanks
>
> Vyshali
>
>


Re: Cluster Peer Lists

2018-09-27 Thread Koji Kawamura
Hi Peter,

Site-to-Site client refreshes remote peer list per 60 secs.
https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java#L60

The address configured to setup a S2S client is used to get remote
peer list initially.
After that, the client knows node01, 02 and 03 are available peers,
then when it refreshes peer list, even if it fails to access node01,
it should retrieve the updated peer list from node02 or 03. However,
if node01 stays in the remote cluster (until it is removed from the
cluster, node02 and 03 still think it's a part of the cluster), the
returned peer list contains node01.
https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java#L383

Another thing to note is that S2S client calculates destination for
the next 128 transaction in advance.
So, if your client does not make transactions often, it may take
longer for re-calculating the next destination.
https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java#L159

To avoid having a single host address at S2S client configuration, you
can use multiple ones delimited by commas.
With this, S2S client can connect when it's restarted even if node01 is down.
E.g. http://node01:8080,http://node02:8080,http://node03:8080

Alternatively, round robin DNS name or Reverse Proxy for the bootstrap
node address can be used similarly.

Thanks,
Koji


On Fri, Sep 28, 2018 at 4:30 AM Peter Wicks (pwicks)  wrote:
>
> Hi NiFi team,
>
>
>
> We had one of the nodes in our cluster go offline today. We eventually 
> resolved the issue, but it exposed some issues in our configuration across 
> our edge NiFi instances.
>
>
>
> Right now we have non-clustered instances of NiFi distributed around the 
> world, pushing data back to a three node cluster via Site-to-Site. All of 
> these instances use the name of the first node (node01), and pull back the 
> peer list and weights from it. But node01 is the node that went offline 
> today, and while some site-to-site connections appeared to use cached data 
> and continued uploading data to node02 and node03, many of the site-to-site 
> connections went down because they were not able to pull the peer list from 
> the cluster, which makes perfect sense to me.
>
>
>
> One question that I was curious about, how long is a peer list cached for if 
> an updated list can’t be retrieved  from the cluster?
>
>
>
> What are the best practices for fixing this? We were throwing around ideas of 
> using a load balancer or round robin DNS name as the entry point for 
> site-to-site, but I figured others have probably already tackled this problem 
> before and could share some ideas.
>
>
>
> Thanks,
>
>   Peter


Re: Listing S3

2018-09-24 Thread Koji Kawamura
Hi Martijn,

I'm not an expert on Jython, but if you already have a python script
using boto3 working fine, then I'd suggest using ExecuteStreamCommand
instead.
For example:
- you can design the python script to print out JSON formatted string
about listed files
- then connect the outputs to SplitJson
- and use EvaluateJsonPath to extract required values to FlowFile attribute
- finally, use FetchS3Object

Thanks,
Koji


Re: High volume data with ExecuteSQL processor

2018-09-24 Thread Koji Kawamura
Hello,

Did you try setting 'Max Rows Per Flow File' at ExecuteSQL processor?
If the OOM happened when NiFi writes all results into a single
FlowFile, then the property can help breaking the result set into
several FlowFiles to avoid that.

Thanks,
Koji
On Fri, Sep 21, 2018 at 3:56 PM Dnyaneshwar Pawar
 wrote:
>
> Hi,
>
>
>
> How to execute/process High volume data with ExecuteSQL processor:
>
>
>
> We tried to execute query for db2 database which has around 10 lakh records. 
> While executing this query
>
> we are getting OutOfMemory error and that request(flowfile) is stuck in 
> queue. When we restart nifi, it still stuck in queue and as soon as we start 
> nifi,
>
> we are again getting same error as it is stuck in queue. Is there any way to 
> configure retry for queue(connection to 2 processor).
>
>
>
> We also tried to change property for Flow File repository in nifi.properties 
> (nifi.flowfile.repository.implementation) to 
> 'org.apache.nifi.controller.repository.VolatileFlowFileRepository',
>
> This is removing flowfile in query while restarting nifi. But it has risk of 
> data loss in the event of power/machine failure for other processes.
>
> So please suggest how to execute high volume data query execution or any 
> retry mechanism available for queued flowfile.
>
>
>
>
>
> Regards,
>
> Dnyaneshwar Pawar
>
>


Re: Unable to see Nifi data lineage in Atlas

2018-07-29 Thread Koji Kawamura
Hi Mohit,

>From the log message, I assume that you are using an existing
atlas-application.properties copied from somewhere (most likely from
HDP environment) and PLAINTEXTSASL is used in it.
PLAINTEXTSASL is not supported by the ReportLineageToAtlas.

As a work-around, please set 'Create Atlas Configuration File' to true
and let the reporting task generate atlas-application.properties
instead.
SASL_PLAINTEXT is identical to PLAINTEXTSASL.
You may need to restart NiFi to take effect.

Hope this helps,
Koji

On Thu, Jul 26, 2018 at 7:12 PM, Mohit  wrote:
> Hi,
>
>
>
> While looking at the logs, I found  out that ReportingLineageToAtlas is not
> able to construct KafkaProducer.
>
> It throws the following logs –
>
>
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:335)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
>
> at
> org.apache.atlas.kafka.KafkaNotification.createProducer(KafkaNotification.java:286)
>
> at
> org.apache.atlas.kafka.KafkaNotification.sendInternal(KafkaNotification.java:207)
>
> at
> org.apache.atlas.notification.AbstractNotification.send(AbstractNotification.java:84)
>
> at
> org.apache.atlas.hook.AtlasHook.notifyEntitiesInternal(AtlasHook.java:133)
>
> at
> org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:118)
>
> at
> org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:171)
>
> at
> org.apache.nifi.atlas.NiFiAtlasHook.commitMessages(NiFiAtlasHook.java:150)
>
> at
> org.apache.nifi.atlas.reporting.ReportLineageToAtlas.lambda$consumeNiFiProvenanceEvents$6(ReportLineageToAtlas.java:721)
>
> at
> org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.consumeEvents(ProvenanceEventConsumer.java:204)
>
> at
> org.apache.nifi.atlas.reporting.ReportLineageToAtlas.consumeNiFiProvenanceEvents(ReportLineageToAtlas.java:712)
>
> at
> org.apache.nifi.atlas.reporting.ReportLineageToAtlas.onTrigger(ReportLineageToAtlas.java:664)
>
> at
> org.apache.nifi.controller.tasks.ReportingTaskWrapper.run(ReportingTaskWrapper.java:41)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalArgumentException: No enum constant
> org.apache.kafka.common.protocol.SecurityProtocol.PLAINTEXTSASL
>
> at java.lang.Enum.valueOf(Enum.java:238)
>
> at
> org.apache.kafka.common.protocol.SecurityProtocol.valueOf(SecurityProtocol.java:28)
>
> at
> org.apache.kafka.common.protocol.SecurityProtocol.forName(SecurityProtocol.java:89)
>
> at
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:277)
>
> ... 20 common frames omitted
>
>
>
> Thanks,
>
> Mohit
>
>
>
> From: Mohit 
> Sent: 25 July 2018 17:46
> To: users@nifi.apache.org
> Subject: Unable to see Nifi data lineage in Atlas
>
>
>
> Hi all,
>
>
>
> I have configured ReportingLineageToAtlas reporting task to send Nifi flow
> information to Atlas. Nifi is integrated with Ranger.
>
> I am able to see all the information in the Atlas except the lineage. When I
> search for hdfs_path or hive_table, I can only see the hive side
> information. I can’t figure out anything wrong in the configuration.
>
> Is there something in the Ranger configuration that I’m missing?
>
>
>
> Regards,
>
> Mohit
>
>
>
>


Re: RPG S2S Error

2018-07-29 Thread Koji Kawamura
Hi Faisai,

Adding ControlRate processor before sending FlowFiles via RPG, you can
throttle the rate of sending data, that should help reducing the
probability for receiving side to get full.

If the current overall throughput is acceptable for your use-case, and
you don't see any data loss, then you should be able to ignore the
message.
You can filter message by log level, configured in conf/logback.xml.
By adding following line, you can filter EndpointConnectionPool
warning messages.


The SocketRemoteSiteListener log message level you want to filter is ERROR.
I think you need to write a custom log filter class to filter it.
https://logback.qos.ch/manual/filters.html

Thanks,
Koji

On Fri, Jul 20, 2018 at 3:11 PM, Faisal Durrani  wrote:
> Hi Joe/Koji,
>
> I cant seem to figure out a way to reduce the back pressure or to find the
> root cause of the errors
>
> 1.Unable to communicate with remote instance Peer [] due to
> java.io.EOFException; closing connection
> 2.indicates that port 37e64bd0-5326-3c3f-80f4-42a828dea1d5's destination is
> full; penalizing peer
>
> I have tried increasing the rate of delivery of the data by increasing the
> concurrent tasks, increasing the back pressure thresholds , replacing the
> puthbasejson processor with puthbaserecord(the slowest part of our data
> flow) etc. While i have seen some  improvement , I can't seem to get rid of
> the above errors. I also changed various settings in the Nifi config like
>
> nifi.cluster.node.protocol.threads =50
> JVM =4096
> nifi.cluster.node.max.concurrent.requests=400
> nifi.cluster.node.protocol.threads=50
> nifi.web.jetty.threads=400
>
> Would it be safe to ignore these error as they fill up the API logs or do I
> need to investigate further? If we can ignore these then is there any way to
> stop them from appearing in the log file?
>
>
>
> On Fri, Jul 13, 2018 at 10:42 AM Joe Witt  wrote:
>>
>> you can allow for larger backlogs by increasing the backpressure
>> thresholds OR you can add additional nodes OR you can expire data.
>>
>> The whole point of the backpressure and pressure release features are to
>> let you be in control of how many resources are dedicated to buffering data.
>> However, in the most basic sense if rate of data arrival always exceeds rate
>> of delivery then delivery must he made faster or data must be expired at
>> some threshold age.
>>
>> thanks
>>
>> On Thu, Jul 12, 2018, 9:34 PM Faisal Durrani  wrote:
>>>
>>> Hi Koji,
>>>
>>> I moved onto another cluster of Nifi nodes , did the same configuration
>>> for S2S there and boom.. the same error message all over the logs.(nothing
>>> on the bulletin board)
>>>
>>> Could it be because of the back pressure as i also get the  error
>>> -(indicates that port 8c77c1b0-0164-1000--052fa54c's destination is
>>> full; penalizing peer) at the same time i see the closing connection error.
>>> I don't see a way to resolve the back pressure as we get continue stream of
>>> data from the kafka which is then inserted into Hbase( the slowest part of
>>> the data flow) which eventually causes the back pressure.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jul 6, 2018 at 4:55 PM Koji Kawamura 
>>> wrote:
>>>>
>>>> Hi Faisai,
>>>>
>>>> I think both error messages indicating the same thing, that is network
>>>> communication is closed in the middle of a Site-to-Site transaction.
>>>> That can be happen due to many reasons, such as freaky network, or
>>>> manually stop the port or RPG while some transaction is being
>>>> processed. I don't think it is a configuration issue, because NiFi was
>>>> able to initiate S2S communication.
>>>>
>>>> Thanks,
>>>> Koji
>>>>
>>>> On Fri, Jul 6, 2018 at 4:16 PM, Faisal Durrani 
>>>> wrote:
>>>> > Hi Koji,
>>>> >
>>>> > In the subsequent tests the above error did not come but now we are
>>>> > getting
>>>> > errors on the RPG :
>>>> >
>>>> >
>>>> > RemoteGroupPort[name=1_pk_ip,targets=http://xx.prod.xx.local:9090/nifi/]
>>>> > failed to communicate with remote NiFi instance due to
>>>> > java.io.IOException:
>>>> > Failed to confirm transaction with
>>>> > Peer[url=nifi://xxx-x.prod.xx.local:5001] due to
>>>> > java.io.IOException:
>>>> > Connection reset by peer
>>&g

Re: How many threads does Jetty use?

2018-07-18 Thread Koji Kawamura
I'd take a NiFi thread dump and analyze it with a thread dump analyzing tool.
fastThread can be used to count number of thready grouped by thread
group. Pretty handy.
http://fastthread.io/

Thanks,
Koji

On Thu, Jul 19, 2018 at 6:41 AM, Peter Wicks (pwicks)  wrote:
> I know the default thread count for Jetty is 200, but is there a way to tell
> how many are actually being used and if I need to make adjustments?
>
>
>
> nifi.web.jetty.threads=200
>
>
>
> Thanks,
>
>   Peter


Re: RPG S2S Error

2018-07-06 Thread Koji Kawamura
Hi Faisai,

I think both error messages indicating the same thing, that is network
communication is closed in the middle of a Site-to-Site transaction.
That can be happen due to many reasons, such as freaky network, or
manually stop the port or RPG while some transaction is being
processed. I don't think it is a configuration issue, because NiFi was
able to initiate S2S communication.

Thanks,
Koji

On Fri, Jul 6, 2018 at 4:16 PM, Faisal Durrani  wrote:
> Hi Koji,
>
> In the subsequent tests the above error did not come but now we are getting
> errors on the RPG :
>
> RemoteGroupPort[name=1_pk_ip,targets=http://xx.prod.xx.local:9090/nifi/]
> failed to communicate with remote NiFi instance due to java.io.IOException:
> Failed to confirm transaction with
> Peer[url=nifi://xxx-x.prod.xx.local:5001] due to java.io.IOException:
> Connection reset by peer
>
> The transport protocol is RAW while the URLs mentioned while setting up the
> RPG is one of the node of the (4)node cluster.
>
> nifi.remote.input.socket.port = 5001
>
> nifi.remote.input.secure=false
>
> nifi.remote.input.http.transaction.ttl=60 sec
>
> nifi.remote.input.host=
>
> Please let me  know if there is any configuration changes that we need to
> make.
>
>
>
>
> On Fri, Jul 6, 2018 at 9:48 AM Faisal Durrani  wrote:
>>
>> Hi Koji ,
>>
>> Thank you for your reply. I updated the logback.xml and ran the test
>> again. I can see an additional error in the app.log which is as below.
>>
>> o.a.nifi.remote.SocketRemoteSiteListener
>> java.io.EOFException: null
>>  at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340)
>>  at java.io.DataInputStream.readUTF(DataInputStream.java:589)
>>  at java.io.DataInputStream.readUTF(DataInputStream.java:564)
>>  at
>> org.apache.nifi.remote.protocol.RequestType.readRequestType(RequestType.java:36)
>>  at
>> org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol.getRequestType(SocketFlowFileServerProtocol.java:147)
>>  at
>> org.apache.nifi.remote.SocketRemoteSiteListener$1$1.run(SocketRemoteSiteListener.java:253)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I notice this error is reported against not just one node but different
>> nodes in the cluster. Would you be able infer the root cause of the issue
>> from this information?
>>
>> Thanks.
>>
>> On Thu, Jul 5, 2018 at 3:34 PM Koji Kawamura 
>> wrote:
>>>
>>> Hello,
>>>
>>> 1. The error message sounds like the client disconnects in the middle
>>> of Site-to-Site communication. Enabling debug log would show more
>>> information, by adding >> level="DEBUG"/> at conf/logback.xml.
>>>
>>> 2. I'd suggest checking if your 4 nodes receive data evenly (well
>>> distributed). Connection status history, 'Queued Count' per node may
>>> be useful to check. If not evenly distributed, I'd lower Remote Port
>>> batch settings at sending side.
>>> Then try to find a bottle neck in downstream flow. Increasing
>>> concurrent tasks at such bottle neck processor can help increasing
>>> throughput in some cases. Adding more node will also help.
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Thu, Jul 5, 2018 at 11:12 AM, Faisal Durrani 
>>> wrote:
>>> > Hi, I've got two questions
>>> >
>>> > 1.We are using Remote Process Group with Raw transport protocol to
>>> > distribute the data across four node cluster. I see the nifi app log
>>> > has a
>>> > lot of instance of the below error
>>> >
>>> > o.a.nifi.remote.SocketRemoteSiteListener Unable to communicate with
>>> > remote
>>> > instance Peer[url=nifi://xxx-xx.prod.xx.:59528]
>>> >
>>> > (SocketFlowFileServerProtocol[CommsID=0bf887ed-acb3-4eea-94ac-5abf53ad0bf1])
>>> > due to java.io.EOFException; closing connection
>>> >
>>> > These error do not show on the bulletin board and nor do I see any data
>>> > loss. I was curious to know if there is some bad configuration that is
>>> > causing this to happen.
>>> >
>>> > 2. The app log also has the below error
>>> >
>>> > o.a.n.r.c.socket.EndpointConnectionPool EndpointConnectionPool[Cluster
>>> > URL=[http://xxx-xx.prod.xx.local:9090/nifi-api]]
>>> > Peer[url=nifi://ins-btrananifi107z.prod.jp.local:5001] indicates that
>>> > port
>>> > 417e3d23-5b1a-1616-9728-9d9d1a462646's destination is full; penalizing
>>> > peer
>>> >
>>> > The data flow consume a high volume data and there is back pressure on
>>> > almost all the connections. So probably that is what causing it. I
>>> > guess
>>> > there isn't much we can do here and once the back pressure resolve ,the
>>> > error goes away on its own.Please let me know of your view.
>>> >
>>> >


Re: RPG S2S Error

2018-07-05 Thread Koji Kawamura
Hello,

1. The error message sounds like the client disconnects in the middle
of Site-to-Site communication. Enabling debug log would show more
information, by adding  at conf/logback.xml.

2. I'd suggest checking if your 4 nodes receive data evenly (well
distributed). Connection status history, 'Queued Count' per node may
be useful to check. If not evenly distributed, I'd lower Remote Port
batch settings at sending side.
Then try to find a bottle neck in downstream flow. Increasing
concurrent tasks at such bottle neck processor can help increasing
throughput in some cases. Adding more node will also help.

Thanks,
Koji

On Thu, Jul 5, 2018 at 11:12 AM, Faisal Durrani  wrote:
> Hi, I've got two questions
>
> 1.We are using Remote Process Group with Raw transport protocol to
> distribute the data across four node cluster. I see the nifi app log has a
> lot of instance of the below error
>
> o.a.nifi.remote.SocketRemoteSiteListener Unable to communicate with remote
> instance Peer[url=nifi://xxx-xx.prod.xx.:59528]
> (SocketFlowFileServerProtocol[CommsID=0bf887ed-acb3-4eea-94ac-5abf53ad0bf1])
> due to java.io.EOFException; closing connection
>
> These error do not show on the bulletin board and nor do I see any data
> loss. I was curious to know if there is some bad configuration that is
> causing this to happen.
>
> 2. The app log also has the below error
>
> o.a.n.r.c.socket.EndpointConnectionPool EndpointConnectionPool[Cluster
> URL=[http://xxx-xx.prod.xx.local:9090/nifi-api]]
> Peer[url=nifi://ins-btrananifi107z.prod.jp.local:5001] indicates that port
> 417e3d23-5b1a-1616-9728-9d9d1a462646's destination is full; penalizing peer
>
> The data flow consume a high volume data and there is back pressure on
> almost all the connections. So probably that is what causing it. I guess
> there isn't much we can do here and once the back pressure resolve ,the
> error goes away on its own.Please let me know of your view.
>
>


Re: Issue with http-notification service

2018-06-24 Thread Koji Kawamura
Hi Raman,

Since you're using 'https' endpoint, I believe you need to configure
HttpNotificationService with trust store settings.
The NullPointerException can happen if OkHttpClient is null when the
notification service tries to send notifications, that can happen if
OkHttpClient is not initialized correctly. I assume there is another
error log message prior to the NPE.

Please check out this documentation on what properties you need to set
such as 'Truststore Filename'
https://nifi.apache.org/docs.html

Thanks,
Koji


On Fri, Jun 22, 2018 at 4:15 AM, Ramaninder Singh Jhajj
 wrote:
> Hi Andy,
>
> I am using version 1.6.0 as a single instance on my local MacBook pro.
>
> The configuration of bootstrap-notification-services.xml file is:
>
>  
>
> http-notification
>
>
> org.apache.nifi.bootstrap.notification.http.HttpNotificationService
>
>  name="URL">https://hooks.slack.com/services/ID_OF_THE_SLAC_WEBHOOK
>
>  
>
>
>
> Can it be due to slack url being https?
>
> I am still not able to figure it out. So any help will be really useful.
>
> Kind Regards,
> Raman
>
>
> On Tue, Jun 12, 2018 at 5:30 PM Andy LoPresto  wrote:
>>
>> Hi Raman,
>>
>> What version of Apache NiFi are you using? In current master, that line
>> (HttpNotificationService.java:228) is a blank line [1]. Can you also share
>> the contents of your bootstrap-notification-services.xml file (redacting
>> sensitive information if necessary)?
>>
>>
>> [1]
>> https://github.com/apache/nifi/blob/master/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/http/HttpNotificationService.java#L228
>>
>> Andy LoPresto
>> alopre...@apache.org
>> alopresto.apa...@gmail.com
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> On Jun 12, 2018, at 2:46 PM, Ramaninder Singh Jhajj
>>  wrote:
>>
>> Hello NiFi Team,
>>
>> I am trying to setup NiFi lifecycle notifications in the bootstrap.conf
>> file. But instead of sending email notifications I want to send
>> notifications on slack so I thought of using http-notification.
>>
>> I have configured the bootstrap-notification-services.xml with the slack
>> webhook url but it fails to send notifications. When I mention url with
>> https it throughts the following error:
>>
>> 2018-06-12 21:29:04,791 ERROR [Notification Service Dispatcher]
>> o.a.n.b.NotificationServiceManager Failed to send notification of type
>> NIFI_STOPPED to HttpNotificationService[id=http-notification] with Subject
>> NiFi Stopped on Host nifi (127.0.1.1) due to
>> org.apache.nifi.bootstrap.notification.NotificationFailedException: Failed
>> to send Http Notification. Will
>> 2018-06-12 21:29:04,792 ERROR [Notification Service Dispatcher]
>> o.a.n.b.NotificationServiceManager
>> org.apache.nifi.bootstrap.notification.NotificationFailedException: Failed
>> to send Http Notification
>> at
>> org.apache.nifi.bootstrap.notification.http.HttpNotificationService.notify(HttpNotificationService.java:239)
>> at
>> org.apache.nifi.bootstrap.NotificationServiceManager$2.run(NotificationServiceManager.java:206)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>> at
>> org.apache.nifi.bootstrap.notification.http.HttpNotificationService.notify(HttpNotificationService.java:228)
>> ... 8 common frames omitted
>>
>>
>> if I change the url to http then it thoughts 400 error:
>>
>> 2018-06-12 21:33:29,346 ERROR [Notification Service Dispatcher]
>> o.a.n.b.NotificationServiceManager
>> org.apache.nifi.bootstrap.notification.NotificationFailedException: Failed
>> to send Http Notification. Received an unsuccessful status code response
>> '400'. The message was 'Bad Request'
>> at
>> org.apache.nifi.bootstrap.notification.http.HttpNotificationService.notify(HttpNotificationService.java:233)
>> at
>> org.apache.nifi.bootstrap.NotificationServiceManager$2.run(NotificationServiceManager.java:206)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> 

Re: Only get file when a set exists.

2018-06-06 Thread Koji Kawamura
JFYI, updated the template on Gist.
https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd

I personally prefer the 2nd work-around, "setting 'Releasable FlowFile
Count' to 0" because the resulted FlowFiles will have more informative
attributes as evidence.

On Wed, Jun 6, 2018 at 4:50 PM, Koji Kawamura  wrote:
> Hi Martijn,
>
> Thanks for sharing new information.
> Here are couple of things to help debugging.
>
> # Debug Notify branch
> 1. Stop Wait branch, to debug solely Notify branch function. Wait
> processor deletes cache entry when it thinks no need to keep it any
> longer.
> 2. Make sure Notify has 'success' and 'failure' relationship. Connect
> both relationships to LogAttribute or something stopped, to keep
> FlowFiles in the queue between that and the Notify.
> 3. Confirm every arrived FlowFile is passed to the 'success'
> relationship. This confirms Notify actually sent every notification
> with expected notification identifier.
> 4. Check all expected keys exist in Redis
>
> # Debug Notify branch 2
> If you can't stop Wait branch for some reason, add
> FetchDistributedMapCache right after Notify. This ensures that a
> signal is successfully written in Redis.
>
> If we can confirm Notify branch works without issue, then I'd suspect
> the once written key gets deleted somehow by Wait processors.
>
> There is a race condition between Wait and Notify.
> My hypothesis is:
> Assuming 'type1.ext1' is routed to Wait branch, and other 7 types are
> to Notify branch.
> 1. Notify receives 'type1.ext2', signal becomes {type1.ext2=1}
> 2. Wait for 'type1.ext2' gets a signal, which has counts as {type1.ext2=1}
> 3. Simultaneously, Notify for 'type2.ext1' notifies, the signal is
> updated to {type1.ext2=1, type2.ext1=1}
> 4. Wait for 'type1.ext2' processes the signal, since the count ab
> reached to target 1, it decrement count ab to 0. Then it deletes the
> key, because it thinks the signal is done because it doesn't have any
> count in it.
> 5. Wait for 'type2.ext1' fetch the key, but the entry is already
> deleted. And it gets stuck in the 'wait' relationship.
>
> If that's the case, changing 'Signal Identifier' from groupId to
> groupId.type can avoid the conflict.
> Alternatively, setting 'Releasable FlowFile Count' to 0 can stop Wait
> to delete cache key.
>
> Thanks,
> Koji
>
>
>
> On Tue, Jun 5, 2018 at 9:23 PM, Martijn Dekkers  
> wrote:
>> Hi Koji,
>>
>> Some more information from debugging.
>>
>> I have today deployed Redis since that gives me an easy interface to check
>> the existence of keys, and found that for files that end up stuck in the
>> wait queues, the provenance in the Notify queue shows the relevant flowfile
>> as having arrived, but the relevant key in Redis shows as (nil)
>>
>> Files that have been processed successfully show a "good" key in Redis.
>>
>> Thanks,
>>
>> Martijn
>>
>> On 5 June 2018 at 06:27, Martijn Dekkers  wrote:
>>>
>>> Hello Koji,
>>>
>>> Many thanks, apologies for the delay in responding - I had to work on some
>>> different tasks.
>>>
>>> I have followed your advice and have configured a flow accordingly, and on
>>> the whole the logic works. However, I still see the issue where a set will
>>> be stuck in the wait queue. I have tracked it down to the instance where
>>> there is a longer delay between the arrival of ext1 and ext2 files. If I
>>> pause the appropriate processor that gates the ext2 files, that set will get
>>> stuck. If all files come through roughly at a similar time, we see no
>>> issues, and the flow happily runs.
>>>
>>> I am not quite sure about the best way to debug this. I have looked at the
>>> attributes in provenance, and notice that the relevant counter for the
>>> specific wait processor isn't updated. I am not sure how I can check the
>>> status of the distributed map cache to see if this might be responsible.
>>>
>>> I can share my flowfile, but would have to email it to you directly,
>>> unfortunately I cannot share the flowfile publicly, and sanitising it to the
>>> extent that I can publicly share it would be difficult.
>>>
>>> Oh, we are using 1.6
>>>
>>> Many thanks,
>>>
>>> Martijn
>>>
>>> On 31 May 2018 at 09:57, Koji Kawamura  wrote:
>>>>
>>>> BTW, which version are you using? I hope it is 1.4.0 or higher. There
>>>> was an issue having effects to your usage.
>>>> https://issues.apache.org/jira/browse/NIFI-4028
>>>&

Re: Only get file when a set exists.

2018-06-06 Thread Koji Kawamura
Hi Martijn,

Thanks for sharing new information.
Here are couple of things to help debugging.

# Debug Notify branch
1. Stop Wait branch, to debug solely Notify branch function. Wait
processor deletes cache entry when it thinks no need to keep it any
longer.
2. Make sure Notify has 'success' and 'failure' relationship. Connect
both relationships to LogAttribute or something stopped, to keep
FlowFiles in the queue between that and the Notify.
3. Confirm every arrived FlowFile is passed to the 'success'
relationship. This confirms Notify actually sent every notification
with expected notification identifier.
4. Check all expected keys exist in Redis

# Debug Notify branch 2
If you can't stop Wait branch for some reason, add
FetchDistributedMapCache right after Notify. This ensures that a
signal is successfully written in Redis.

If we can confirm Notify branch works without issue, then I'd suspect
the once written key gets deleted somehow by Wait processors.

There is a race condition between Wait and Notify.
My hypothesis is:
Assuming 'type1.ext1' is routed to Wait branch, and other 7 types are
to Notify branch.
1. Notify receives 'type1.ext2', signal becomes {type1.ext2=1}
2. Wait for 'type1.ext2' gets a signal, which has counts as {type1.ext2=1}
3. Simultaneously, Notify for 'type2.ext1' notifies, the signal is
updated to {type1.ext2=1, type2.ext1=1}
4. Wait for 'type1.ext2' processes the signal, since the count ab
reached to target 1, it decrement count ab to 0. Then it deletes the
key, because it thinks the signal is done because it doesn't have any
count in it.
5. Wait for 'type2.ext1' fetch the key, but the entry is already
deleted. And it gets stuck in the 'wait' relationship.

If that's the case, changing 'Signal Identifier' from groupId to
groupId.type can avoid the conflict.
Alternatively, setting 'Releasable FlowFile Count' to 0 can stop Wait
to delete cache key.

Thanks,
Koji



On Tue, Jun 5, 2018 at 9:23 PM, Martijn Dekkers  wrote:
> Hi Koji,
>
> Some more information from debugging.
>
> I have today deployed Redis since that gives me an easy interface to check
> the existence of keys, and found that for files that end up stuck in the
> wait queues, the provenance in the Notify queue shows the relevant flowfile
> as having arrived, but the relevant key in Redis shows as (nil)
>
> Files that have been processed successfully show a "good" key in Redis.
>
> Thanks,
>
> Martijn
>
> On 5 June 2018 at 06:27, Martijn Dekkers  wrote:
>>
>> Hello Koji,
>>
>> Many thanks, apologies for the delay in responding - I had to work on some
>> different tasks.
>>
>> I have followed your advice and have configured a flow accordingly, and on
>> the whole the logic works. However, I still see the issue where a set will
>> be stuck in the wait queue. I have tracked it down to the instance where
>> there is a longer delay between the arrival of ext1 and ext2 files. If I
>> pause the appropriate processor that gates the ext2 files, that set will get
>> stuck. If all files come through roughly at a similar time, we see no
>> issues, and the flow happily runs.
>>
>> I am not quite sure about the best way to debug this. I have looked at the
>> attributes in provenance, and notice that the relevant counter for the
>> specific wait processor isn't updated. I am not sure how I can check the
>> status of the distributed map cache to see if this might be responsible.
>>
>> I can share my flowfile, but would have to email it to you directly,
>> unfortunately I cannot share the flowfile publicly, and sanitising it to the
>> extent that I can publicly share it would be difficult.
>>
>> Oh, we are using 1.6
>>
>> Many thanks,
>>
>> Martijn
>>
>> On 31 May 2018 at 09:57, Koji Kawamura  wrote:
>>>
>>> BTW, which version are you using? I hope it is 1.4.0 or higher. There
>>> was an issue having effects to your usage.
>>> https://issues.apache.org/jira/browse/NIFI-4028
>>>
>>> On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura 
>>> wrote:
>>> > HI Martijn,
>>> >
>>> > I used the filename increment pattern based on your first filename
>>> > example.
>>> > file_123_456_ab.ex1
>>> > I increment the 456 part. If it changed, that's fine.
>>> >
>>> > Your current configurations look like below:
>>> > - Given a filename: file_123_type3.ext1
>>> > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
>>> > - groupID will be: 123_ (including the underscore)
>>> > - counterName will be: type3
>>> >
>>> > I was suggesting include the extension to the counterName.
>>&g

Re: Only get file when a set exists.

2018-05-31 Thread Koji Kawamura
BTW, which version are you using? I hope it is 1.4.0 or higher. There
was an issue having effects to your usage.
https://issues.apache.org/jira/browse/NIFI-4028

On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura  wrote:
> HI Martijn,
>
> I used the filename increment pattern based on your first filename example.
> file_123_456_ab.ex1
> I increment the 456 part. If it changed, that's fine.
>
> Your current configurations look like below:
> - Given a filename: file_123_type3.ext1
> - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
> - groupID will be: 123_ (including the underscore)
> - counterName will be: type3
>
> I was suggesting include the extension to the counterName.
> How about changing the RegEx as:
> - RegEx: ^file_(\d+)_(\w+\.ext\d)$
> - groupID will be: 123
> - counterName will be: type3.ext1
>
> Then you can route type1.ext1 to Wait branch and other 7 to Notify.
> In Wait branch, you need 7 Wait processors.
>
> It would fast to debug if you can share your flow template..
>
> Thanks,
> Koji
>
> On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers  
> wrote:
>> Thank you Koji,
>>
>> I have tried once again, using your updated example. Unfortunately, things
>> still get stuck at the first Wait processors' wait queue.
>> I did notice that the format of the files your example generates is
>> different. I will try to clarify:
>>
>> - 8 files in total:
>>
>> -- file_123_type1.ext1
>> -- file_123_type1.ext2
>>
>> -- file_123_type2.ext1
>> -- file_123_type2.ext2
>>
>> -- file_123_type3.ext1
>> -- file_123_type3.ext2
>>
>> -- file_123_type4.ext1
>> -- file_123_type4.ext2
>>
>> For each set of 8 files, "file_123" increments, so the first set of 8 is
>> "file_123", and the next set is "file_124" and so on.
>>
>> When I look at your example, I notice that at the final step (LogAttribute
>> after the FetchFile set) the filenames are file_123_> number>.ex(1|2)
>>
>> My UpdateAttribute before the Notify branch is configured as:
>> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>> counterName - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')}
>>
>> The UpdateAttribute before the Wait branch is configured as:
>> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>>
>> The 4 Wait processors in the Wait branch are configured as:
>> All Wait processors:
>> Release Signal Identifier - ${groupID}
>>
>> For each individual Wait processor:
>> Signal Counter Name - type1
>> Signal Counter Name - type2
>> Signal Counter Name - type3
>> Signal Counter Name - type4
>>
>> I am a bit stumped. The best success we had was a configuration with a
>> RouteonAttribute sending each of type1|type2|type3|type4 to their own Wait
>> processor, and a similar config for the Notify branch, followed by a final
>> Wait/Notify pair that simply ensures we have the correct amount of sets.
>>
>> This configuration did exactly what we want, but unfortunately we had random
>> flowfiles stuck in the waitqueue for no apparent reason.
>>
>> Thanks,
>>
>> Martijn
>>
>>
>>
>> On 31 May 2018 at 05:23, Koji Kawamura  wrote:
>>>
>>> The order of arrival does not matter.
>>>
>>> Wait processor has 'Expiration Duration' configuration, defaults to 10
>>> min. Please adjust it according to your needs, the longest period to
>>> wait for a delayed file.
>>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>>> relationship, and can be treated differently, e.g. write ERROR log
>>>
>>> > If we have a longer wait for a file, we'd like processing for the next
>>> > groupid to still be able to continue.
>>>
>>> In order to achieve that, use Wait Mode = 'Transfer to wait
>>> relationship', and the 'wait' relationship should be configured to use
>>> FirstInFirstOutPrioritizer.
>>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>>> processed again while it blocks other FlowFiles.
>>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>>> re-queued at the end of wait queue.
>>>
>>> I've updated my example to make it more realistic, by adding delay for
>>> certain set and type.
>>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>>>  wrote

Re: Only get file when a set exists.

2018-05-31 Thread Koji Kawamura
HI Martijn,

I used the filename increment pattern based on your first filename example.
file_123_456_ab.ex1
I increment the 456 part. If it changed, that's fine.

Your current configurations look like below:
- Given a filename: file_123_type3.ext1
- matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
- groupID will be: 123_ (including the underscore)
- counterName will be: type3

I was suggesting include the extension to the counterName.
How about changing the RegEx as:
- RegEx: ^file_(\d+)_(\w+\.ext\d)$
- groupID will be: 123
- counterName will be: type3.ext1

Then you can route type1.ext1 to Wait branch and other 7 to Notify.
In Wait branch, you need 7 Wait processors.

It would fast to debug if you can share your flow template..

Thanks,
Koji

On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers  wrote:
> Thank you Koji,
>
> I have tried once again, using your updated example. Unfortunately, things
> still get stuck at the first Wait processors' wait queue.
> I did notice that the format of the files your example generates is
> different. I will try to clarify:
>
> - 8 files in total:
>
> -- file_123_type1.ext1
> -- file_123_type1.ext2
>
> -- file_123_type2.ext1
> -- file_123_type2.ext2
>
> -- file_123_type3.ext1
> -- file_123_type3.ext2
>
> -- file_123_type4.ext1
> -- file_123_type4.ext2
>
> For each set of 8 files, "file_123" increments, so the first set of 8 is
> "file_123", and the next set is "file_124" and so on.
>
> When I look at your example, I notice that at the final step (LogAttribute
> after the FetchFile set) the filenames are file_123_ number>.ex(1|2)
>
> My UpdateAttribute before the Notify branch is configured as:
> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
> counterName - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')}
>
> The UpdateAttribute before the Wait branch is configured as:
> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>
> The 4 Wait processors in the Wait branch are configured as:
> All Wait processors:
> Release Signal Identifier - ${groupID}
>
> For each individual Wait processor:
> Signal Counter Name - type1
> Signal Counter Name - type2
> Signal Counter Name - type3
> Signal Counter Name - type4
>
> I am a bit stumped. The best success we had was a configuration with a
> RouteonAttribute sending each of type1|type2|type3|type4 to their own Wait
> processor, and a similar config for the Notify branch, followed by a final
> Wait/Notify pair that simply ensures we have the correct amount of sets.
>
> This configuration did exactly what we want, but unfortunately we had random
> flowfiles stuck in the waitqueue for no apparent reason.
>
> Thanks,
>
> Martijn
>
>
>
> On 31 May 2018 at 05:23, Koji Kawamura  wrote:
>>
>> The order of arrival does not matter.
>>
>> Wait processor has 'Expiration Duration' configuration, defaults to 10
>> min. Please adjust it according to your needs, the longest period to
>> wait for a delayed file.
>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>> relationship, and can be treated differently, e.g. write ERROR log
>>
>> > If we have a longer wait for a file, we'd like processing for the next
>> > groupid to still be able to continue.
>>
>> In order to achieve that, use Wait Mode = 'Transfer to wait
>> relationship', and the 'wait' relationship should be configured to use
>> FirstInFirstOutPrioritizer.
>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>> processed again while it blocks other FlowFiles.
>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>> re-queued at the end of wait queue.
>>
>> I've updated my example to make it more realistic, by adding delay for
>> certain set and type.
>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>
>> Thanks,
>> Koji
>>
>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>>  wrote:
>> > Cool, that will make things a lot simpler. Does it matter that the ext2
>> > files arrive in random order? Sometimes there can be a very long delay
>> > in
>> > some of them showing up, and we have some concerns about the overall
>> > flow
>> > blocking. If we have a longer wait for a file, we'd like processing for
>> > the
>> > next groupid to still be able to continue.
>> >
>> > Thank you for your help (and for writing Wait/Notify!!)
>> >
>> > Martijn
>> >
>> > On 31 May 2018 at 03:49, Koji Kawamura  wrote:
>> >>
>> >> Glad to hear that was helpfu

Re: Only get file when a set exists.

2018-05-30 Thread Koji Kawamura
The order of arrival does not matter.

Wait processor has 'Expiration Duration' configuration, defaults to 10
min. Please adjust it according to your needs, the longest period to
wait for a delayed file.
If a FlowFile exceeds the duration, it will be sent to 'expired'
relationship, and can be treated differently, e.g. write ERROR log

> If we have a longer wait for a file, we'd like processing for the next 
> groupid to still be able to continue.

In order to achieve that, use Wait Mode = 'Transfer to wait
relationship', and the 'wait' relationship should be configured to use
FirstInFirstOutPrioritizer.
If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
processed again while it blocks other FlowFiles.
With FirstInFirstOutPrioritizer, the processed FlowFile will be
re-queued at the end of wait queue.

I've updated my example to make it more realistic, by adding delay for
certain set and type.
https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd

Thanks,
Koji

On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
 wrote:
> Cool, that will make things a lot simpler. Does it matter that the ext2
> files arrive in random order? Sometimes there can be a very long delay in
> some of them showing up, and we have some concerns about the overall flow
> blocking. If we have a longer wait for a file, we'd like processing for the
> next groupid to still be able to continue.
>
> Thank you for your help (and for writing Wait/Notify!!)
>
> Martijn
>
> On 31 May 2018 at 03:49, Koji Kawamura  wrote:
>>
>> Glad to hear that was helpful.
>>
>> "4 same type for each extension", can be treated as "8 distinct types"
>> if an extension is included in a type.
>> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>>
>> Then route only 'ab.ex1' (or whichever but just 1 of them) to the Wait
>> branch, and the rest to Notify branch.
>> That will simplify the flow, if I'm not missing any other requirement.
>>
>> Thanks!
>> Koji
>>
>> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>>  wrote:
>> > Hi Koji, Many thanks for your continued assistance!
>> >
>> >>
>> >> - 1 file per second is relatively low in terms of traffic, it should
>> >> be processed fine with 1 thread
>> >> - A flow like this, which is stateful across different parts of the
>> >> flow works at best with single thread, because using multiple threads
>> >> would cause race condition or concurrency issue if there's any
>> >> implementation error
>> >
>> >
>> > Yes, we had similar thoughts.
>> >
>> >>
>> >> - Based on above, I strongly recommend to NOT increase "concurrent
>> >> tasks". If you see FlowFiles staying in a wait queue, then there must
>> >> be different issue
>> >
>> >
>> > We don't see many flowfiles stuck in a wait queue, I ran a test over a
>> > few
>> > hours yesterday that simulates the way in which these files would appear
>> > (we
>> > would have 4 of "ext1" show up every second, and the "ext2" can show up
>> > a
>> > few seconds later, and not always in the same order) and we found
>> > perhaps 6
>> > flowfiles stuck in a wait queue.
>> >
>> >>
>> >> - Also, using concurrent tasks number like 400 is too much in general
>> >> for all processors. I recommend to increment it as 2, 3, 4 .. up to 8
>> >> or so, only if you see the clear benefit by doing so
>> >
>> >
>> > Indeed, thanks for the suggestion. Once we have the logic finished and
>> > tested we will have to optimise this Flow. The next step is to try to
>> > load
>> > the required processors into MiNiFy, as this will be running on many
>> > systems
>> > with limited capacity. If we don't manage with MiNiFy, we will still be
>> > good, but we prefer to have the smaller footprint and ease of management
>> > we
>> > can obtain with MiNiFy.
>> >
>> >>
>> >> - The important part of this flow is extracting 'groupId' and 'type'
>> >> from file names. Regular Expression needs to be configured properly.
>> >> - I recommend using https://regex101.com/ to test your Regular
>> >> Expression to see whether it can extract correct groupId and type
>> >
>> >
>> > Yes, we have tested our RegExes for this extensively
>> >
>> >>
>> >>
>> >> Lastly, regardless of how many files should be there for 

Re: Only get file when a set exists.

2018-05-30 Thread Koji Kawamura
ntent using FetchFile, and process it
>
>
> Besides the "4 same types for each extension", this is configured as you
> describe.
>
>>
>> I hope it helps.
>>
>
> It does, thanks. I will extract this portion of the flow, sanitise, and send
> it along - easier to see than to describe :)
>
>
>>
>> Thanks,
>> Koji
>
>
> Thank you so much once again!
>
> Martijn
>
>
>
>>
>>
>> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers 
>> wrote:
>> > Hey Pierre,
>> >
>> > Yes, we suspected as much, but we are only seeing this with the Wait
>> > processor. Possibly because that is the only "blocking" we have in this
>> > flow.
>> >
>> > Thanks for the clarification, much appreciated!
>> >
>> > Martijn
>> >
>> > On 30 May 2018 at 10:30, Pierre Villard 
>> > wrote:
>> >>
>> >> I'll let Koji give more information about the Wait/Notify, he is
>> >> clearly
>> >> the expert here.
>> >>
>> >> I'm just jumping in regarding your "and when viewing the queue, the
>> >> dialog
>> >> states that the queue is empty.". You're seeing this behavior because,
>> >> even
>> >> though the UI shows some flow files in the queue, the flow files are
>> >> currently locked in the session of the running processor and you won't
>> >> see
>> >> flow files currently processed in a session when listing a queue. If
>> >> you
>> >> stop the processor, the session will be closed and you'll be able to
>> >> list
>> >> the queue and see the flow files.
>> >>
>> >> I recall discussions in the past to improve the UX for this. Not sure
>> >> we
>> >> have a JIRA for it though...
>> >>
>> >> Pierre
>> >>
>> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers :
>> >>>
>> >>> Hi Koji,
>> >>>
>> >>> Thank you for responding. I had adjusted the run schedule to closely
>> >>> mimic our environment. We are expecting about 1 file per second or so.
>> >>> We are also seeing some random "orphans" sitting in a wait queue every
>> >>> now and again that don't trigger a debug message, and when viewing the
>> >>> queue, the dialog states that the queue is empty.
>> >>>
>> >>> We found the random "no signal found" issue to be significantly
>> >>> decreased
>> >>> when we increase the "concurrent tasks" to something large - currently
>> >>> set
>> >>> to 400 for all wait and notify processors.
>> >>>
>> >>> I do need to mention that our requirements had changed since you made
>> >>> the
>> >>> template, in that we are looking for a set of 8 files - 4 x "ext1" and
>> >>> 4 x
>> >>> "ext2" both with the same pattern: .ext1
>> >>> or ext2
>> >>>
>> >>> We found that the best way to make this work was to add another
>> >>> wait/notify pair, each processor coming after the ones already there,
>> >>> with a
>> >>> simple counter against the groupID.
>> >>>
>> >>> I will export a template for you, many thanks for your help - I just
>> >>> need
>> >>> to spend some time sanitising the varies fields etc.
>> >>>
>> >>> Many thanks once again for your kind assistance.
>> >>>
>> >>> Martijn
>> >>>
>> >>> On 30 May 2018 at 08:14, Koji Kawamura  wrote:
>> >>>>
>> >>>> Hi Martjin,
>> >>>>
>> >>>> In my template, I was using 'Run Schedule' as '5 secs' for the Wait
>> >>>> processors to avoid overusing CPU resource. However, if you expect
>> >>>> more throughput, it should be lowered.
>> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of files (400
>> >>>> files because 4 files are 1 set in my example), they reached to the
>> >>>> expected goal of the flow without issue.
>> >>>>
>> >>>> If you can share your flow and example input file volume (hundreds of
>> >>>> files were fine in my flow), I may be able to provide more useful
>> >

Re: Only get file when a set exists.

2018-05-30 Thread Koji Kawamura
Hi Martijn,

Thanks for elaborating your requirement.  Here are few comments:

- 1 file per second is relatively low in terms of traffic, it should
be processed fine with 1 thread
- A flow like this, which is stateful across different parts of the
flow works at best with single thread, because using multiple threads
would cause race condition or concurrency issue if there's any
implementation error
- Based on above, I strongly recommend to NOT increase "concurrent
tasks". If you see FlowFiles staying in a wait queue, then there must
be different issue
- Also, using concurrent tasks number like 400 is too much in general
for all processors. I recommend to increment it as 2, 3, 4 .. up to 8
or so, only if you see the clear benefit by doing so
- The important part of this flow is extracting 'groupId' and 'type'
from file names. Regular Expression needs to be configured properly.
- I recommend using https://regex101.com/ to test your Regular
Expression to see whether it can extract correct groupId and type

Lastly, regardless of how many files should be there for 'ext1' and
'ext2', the flow structure is simple as below.
Let's say there should be 8 files to start processing those.
4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types.
And I assume the types are known, meaning, static, not dynamically change.
So, the rule is, "a set of files consists of 8 files, and a set should
wait to be processed until all 8 files are ready", that's all.

Then, the flow should be designed like below:
1. List files, each file will be sent as a FlowFile
2. Extract groupId and type from filename
3. Route FlowFiles into two branches, let's call these 'Notify' branch
and 'Wait' branch, and pass only 1 type for a set to Wait-branch, and
the rest 7 types to Notify-branch

At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4 ... 8)
1. Notify that the type for a group has arrived.
2. Discard the FlowFile, because there's nothing to do with it in this branch

At Wait branch (for the type 1 FlowFile)
1. Wait for type 2 for the groupId.
2. Wait for type 3 for the groupId, type 4, 5 and so on
3. After passing Wait for type 8, it can guarantee that all 8 files
are available (unless there is any other program deleting those)
4. Get actual file content using FetchFile, and process it

I hope it helps.

Thanks,
Koji


On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers  wrote:
> Hey Pierre,
>
> Yes, we suspected as much, but we are only seeing this with the Wait
> processor. Possibly because that is the only "blocking" we have in this
> flow.
>
> Thanks for the clarification, much appreciated!
>
> Martijn
>
> On 30 May 2018 at 10:30, Pierre Villard  wrote:
>>
>> I'll let Koji give more information about the Wait/Notify, he is clearly
>> the expert here.
>>
>> I'm just jumping in regarding your "and when viewing the queue, the dialog
>> states that the queue is empty.". You're seeing this behavior because, even
>> though the UI shows some flow files in the queue, the flow files are
>> currently locked in the session of the running processor and you won't see
>> flow files currently processed in a session when listing a queue. If you
>> stop the processor, the session will be closed and you'll be able to list
>> the queue and see the flow files.
>>
>> I recall discussions in the past to improve the UX for this. Not sure we
>> have a JIRA for it though...
>>
>> Pierre
>>
>> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers :
>>>
>>> Hi Koji,
>>>
>>> Thank you for responding. I had adjusted the run schedule to closely
>>> mimic our environment. We are expecting about 1 file per second or so.
>>> We are also seeing some random "orphans" sitting in a wait queue every
>>> now and again that don't trigger a debug message, and when viewing the
>>> queue, the dialog states that the queue is empty.
>>>
>>> We found the random "no signal found" issue to be significantly decreased
>>> when we increase the "concurrent tasks" to something large - currently set
>>> to 400 for all wait and notify processors.
>>>
>>> I do need to mention that our requirements had changed since you made the
>>> template, in that we are looking for a set of 8 files - 4 x "ext1" and 4 x
>>> "ext2" both with the same pattern: .ext1 or ext2
>>>
>>> We found that the best way to make this work was to add another
>>> wait/notify pair, each processor coming after the ones already there, with a
>>> simple counter against the groupID.
>>>
>>> I will export a template for you, many thanks for your help - I just need
>>> to spend some tim

Re: Hive connection Pool error

2018-05-29 Thread Koji Kawamura
Hello,

Although I encountered various Kerberos related error, I haven't
encountered that one.
I tried to reproduce the same error by changing Kerberos related
configuration, but to no avail.
I recommend enabling Kerberos debug option for further debugging.

You can add the option at nifi/conf/bootstrap.conf:
java.arg.19=-Dsun.security.krb5.debug=true

Then debug logs are written to nifi/logs/nifi-bootstap.log

Thanks,
Koji

On Tue, May 29, 2018 at 10:31 PM, Vishal Dutt  wrote:
> Hi ,
>
>
>
> We  are getting below error on randomly for few minutes and then goes away,
> its coming in PUThiveql
>
>
>
>
>
> 2018-05-29 01:01:07,279 INFO [Timer-Driven Process Thread-95]
> org.apache.hive.jdbc.HiveConnection Will try to open client transport with
> JDBC Uri:
> jdbc:hive2://ctcl-hdpmaster2.msoit.com:1/default;principal=hive/_h...@msoit.com
>
> 2018-05-29 01:01:07,281 ERROR [Timer-Driven Process Thread-95]
> o.apache.thrift.transport.TSaslTransport SASL negotiation failure
>
> javax.security.sasl.SaslException: GSS initiate failed
>
> at
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
>
> at
> org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
>
> at
> org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
>
> at
> org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
>
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
>
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
>
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
>
> at
> org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:204)
>
> at
> org.apache.hive.jdbc.HiveConnection.(HiveConnection.java:176)
>
> at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
>
> at
> org.apache.commons.dbcp.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:38)
>
> at
> org.apache.commons.dbcp.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:582)
>
> at
> org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1148)
>
> at
> org.apache.commons.dbcp.PoolingDataSource.getConnection(PoolingDataSource.java:106)
>
> at
> org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044)
>
> at
> org.apache.nifi.dbcp.hive.HiveConnectionPool.lambda$getConnection$0(HiveConnectionPool.java:355)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
>
> at
> org.apache.nifi.dbcp.hive.HiveConnectionPool.getConnection(HiveConnectionPool.java:355)
>
> at sun.reflect.GeneratedMethodAccessor393.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
>
> at com.sun.proxy.$Proxy97.getConnection(Unknown Source)
>
> at
> org.apache.nifi.processors.hive.PutHiveQL.lambda$new$1(PutHiveQL.java:191)
>
> at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:96)
>
> at
> org.apache.nifi.processors.hive.PutHiveQL.lambda$onTrigger$6(PutHiveQL.java:274)
>
> at
> org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
>
> at
> org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
>
> at
> org.apache.nifi.processors.hive.PutHiveQL.onTrigger(PutHiveQL.java:274)
>
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
>
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
>
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> at
> 

Re: Only get file when a set exists.

2018-05-27 Thread Koji Kawamura
Hi Martin,

Alternative approach is using Wait/Notify processors.
I have developed similar flow using those before, and it will work
with your case I believe.
A NiFi flow template is available here.
https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd

Hope this helps,
Koji


On Sun, May 27, 2018 at 11:48 PM, Andrew Grande  wrote:
> Martijn,
>
> Here's an idea you could explore. Have the ListFile processor work as usual
> and create a custom component (start with a scripting one to prototype)
> grouping the filenames as needed. I don't know of the number of files in a
> set is different every time, so trying to be more robust.
>
> Once you group and count the set, you can transfer the names to the success
> relationship. Ignore otherwise and wait until the set is full.
>
> Andrew
>
>
> On Sun, May 27, 2018, 7:29 AM Martijn Dekkers 
> wrote:
>>
>> Hello all,
>>
>> I am trying to work out an issue with little success.
>>
>> I need to ingest files generated by some application. I can only ingest
>> these files when a specific set exists. For example:
>>
>> file_123_456_ab.ex1
>> file_123_456_cd.ex1
>> file_123_456_ef.ex1
>> file_123_456_gh.ex1
>> file_123_456.ex2
>>
>> Only when a set like that exists should I pick them up into the Flow. The
>> parts I am looking for to "group" would "ab.ex1", "cd.ex1", "ef.ex1",
>> "gh.ex1", ".ex2".
>>
>> I tried to do this with some expression, but couldn't work it out.
>>
>> What would be the best way to achieve this?
>>
>> Many thanks!


Re: MonitorActivity to PutSlack

2018-05-08 Thread Koji Kawamura
Hi Nick,

You may find ExtractText useful to extract string from FlowFile
content into FlowFile attribute.
E.g. MonitorActivity -> ExtractText -> PutSlack

Thanks,
Koji

On Wed, May 9, 2018 at 9:03 AM, Nick Carenza
 wrote:
> Hey all,
>
> I can't find a way to send messages with PutSlack from the MonitorActivity
> processor. MonitorActivity only writes to content and PutSlack only reads
> from attributes and expression language doesn't provide a way to bridge the
> two. What is the best way to make these processors communicate?
>
> Thanks,
> Nick


Re: Nifi Remote Process Group FlowFile Distribution among nodes

2018-05-08 Thread Koji Kawamura
Hi Mohit,

NiFi RPG batches multiple FlowFiles into the same Site-to-Site
transaction, and the default batch settings are configured for higher
throughput.
If you prefer more granular distribution, you can lower the batch
configurations from "Manage Remote Ports" context menu of a
RemoteProcessGroup.
The batch size configuration from UI is added since NiFi 1.2.0, and
the JIRA can be a reference.
https://issues.apache.org/jira/browse/NIFI-1202

Thanks,
Koji

On Tue, May 8, 2018 at 2:24 PM, Mohit  wrote:
> Hi,
>
>
>
> I need some clarity on how flowfile is distributed among different nodes in
> a Nifi cluster.
>
>
>
> I have a flow where I’m using GenerateTableFetch  to fetch the data from
> database. Source table has around 40 million records. I tried with different
> partition size which led to create different number of flowfiles.
>
> When there are less number of flowfiles(~40), RPG sends it to only one
> node(in a 4 node cluster) but when there are large number of
> flowfiles(~400), it distribute the flowfile among all the nodes.
>
> Are there some rules or best practices to fine tune the flow, so that the
> flowfiles are evenly distributed across the nodes even if there are less
> number of flowfiles.
>
>
>
> Regards,
>
> Mohit


Re: problem with Nifi / Atlas integration - has anyone some experience with this integration ?

2018-04-26 Thread Koji Kawamura
Hi Dominique,

Thank you for your interest in NiFI and Atlas integration.
I have some experience with that, and actually written the NiFi reporting task.

I have two things in mind could be related to your situation.
One is NIFI-4971, it's under being reviewed now. It fixes lineage
reporting issue when 'complete path' strategy is used.
If you are using 'complete path', I'd recommend trying 'simple path'
to see if that's the case.

The other one is Atlas not being able to catch up fast enough to
consume all messages from the Kafka topic.
This happens when lots of messages are sent to the Atlas hook topic
from NiFi, particularly seen when different files are written or
retrieved from file system and NiFi tries to report it, as those
entities are reported individually.
Following command can be helpful to see how Atlas consumes messages.
If there're lots of LAG, those messages are waiting to be consumed and
processed by Atlas.

# Sometimes Atlas consumer is not catching up and entities are not
created even if NiFi reported as expected
KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server server:port
--describe --group atlas
GROUP  TOPIC
PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG OWNER
atlas  ATLAS_HOOK 0
  24944   31897   6953

Thanks,
Koji


On Thu, Apr 26, 2018 at 6:50 PM, Dominique De Vito  wrote:
> Hi,
>
> I have defined a simple pipeline in Nifi:
>
> GetFile => some processor doing a dummy transformation => PublishInKafka
>
> ...with Atlas integration for lineage purposes
>
> Versions:
> -- Atlas 0.8.0 (Stack : HDP 2.6.4)
> -- Nifi 1.5.0
>
> and I have put some (dummy) file into the input directory, and it went up to
> the end of the pipeline.
>
> Results:
>
> * a "nifi_flow" entity and a "nifi_flow_path" entity were defined in Atlas
> <= good
>
> * PROBLEM_1: the "nifi_flow_path" entity has no input, neither output.
>
> But I see in the Nifi logs a trace stating that Nifi has sent a
> "ENTITY_PARTIAL_UPDATE" json to Atlas HOOK topic, with correct input and
> output.
>
> So, something looks like broken in Nifi<=>Atlas link, or within Atlas.
>
> * PROBLEM_2 (but Atlas related): when I use the GUI, Atlas says it can't
> found the "nifi_flow" entity while it's available through the REST api:
>
> 2018-04-24 05:48:14,317 ERROR - [pool-2-thread-5 -
> 3076c14e-9bb4-44a7-8299-d56476f3ec89:] ~ graph rollback due to exception
> AtlasBaseException:Instance nifi_flow with unique attribute
> {qualifiedName=76d4acd9-0162-1000-257a-7393e17b3a16@mycluster5} does not
> exist (GraphTransactionInterceptor:73)
>
> >
>
> So my questions:
>
> 1) Did anyone meet such problems ?
>
> 2) Does anyone have had some (good) experience integrating Nifi with Atlas ?
>
> Thanks.
>
> Dominique
>


Re: Execute multiple HQL statements in PutHiveQL or SelectHiveQL

2018-04-19 Thread Koji Kawamura
Hello Dejan,

I tested SET property statements bundled with INSERT statement in a single
FlowFile passed to PutHiveQL.
The same warning message is logged as you reported. However, actual INSERT
was successful, I confirmed new rows were inserted.
Please let us know if not the case.

Although the warning is harmless, I agree it can be mis-leading.
I submitted a JIRA and a PR to suppress the warning logs for such SET
commands.
https://issues.apache.org/jira/browse/NIFI-5095

Thanks,
Koji


On Tue, Apr 17, 2018 at 5:03 PM, Krstic, Dejan 
wrote:

> Matt,
>
>
>
> No, there are no comments at all in this script.
>
>
>
> Br
>
> Dejan
>
>
>
> *Von:* Matt Burgess [mailto:mattyb...@apache.org]
> *Gesendet:* Montag, 16. April 2018 18:38
> *An:* users@nifi.apache.org
> *Betreff:* Re: Execute multiple HQL statements in PutHiveQL or
> SelectHiveQL
>
>
>
> Dejan,
>
>
>
> Are there any comments in your Hive "script"? If so I believe you need to
> remove them [1].  When I've seen that issue, it's always been because of
> comments before the SET line.
>
>
>
> Regards,
>
> Matt
>
>
>
> [1] https://issues.apache.org/jira/browse/HIVE-1166
>
>
>
>
>
> On Mon, Apr 16, 2018 at 12:29 PM, Krstic, Dejan 
> wrote:
>
> Hello,
>
>
>
> I have Hive table from which I am trying to load data into other Hive
> table (same server) and to do some transformations in between. The problem
> is, I need to have two SET statements before I actually do INSERT statement
> and if I try to separate everything with semicolons (as configured in
> processor) I am getting following error message:
>
>
>
> 2018-04-16 16:11:59,368 WARN [Timer-Driven Process Thread-1]
> o.apache.nifi.processors.hive.PutHiveQL 
> PutHiveQL[id=9d9290d2-2196-3e7b-a522-ab0e2a46db11]
> Failed to parse hiveQL: SET hive.exec.dynamic.partition = true due to
> org.apache.hadoop.hive.ql.parse.ParseException: line 1:4 missing KW_ROLE
> at 'hive' near 'hive'
>
> line 1:8 missing EOF at '.' near 'hive':
>
>
>
> The two SET statements at the beginning:
>
>
>
> Set hive.exec.dynamic.partition = true;
>
> set hive.exec.dynamic.partition.mode=nonstrict;
>
>
>
> I also tried the same thing with SelectHiveQL but it seems not to accept
> multiple queries. Which way would be the best to fix this problem and what
> processor is the best to use in this situation?
>
>
>
> Thank you.
>
>
>
> Best regards
>
> Dejan Krstic
>
>
>
>
>
> *T-Mobile Austria GmbH*
>
> Dejan Krstic
>
> Data Manager
>
>
>
> Rennweg 97-99
> 
> A-1030 Wien
>
> E-Mail: mailto:dejan.krs...@t-mobile.at 
> *www.t-mobile.at *
>
> *DAS VERBINDET UNS.*
>
>
>
> ___
>
> Notice: This e-mail and any attachments are confidential and may be 
> privileged.
>
> If you are not the intended recipient, notify the sender immediately, destroy 
> all
>
> copies from your system and do not disclose or use the information for any 
> purpose.
>
> Diese E-Mail inklusive aller Anhaenge ist vertraulich und koennte 
> bevorrechtigtem
>
> Schutz unterliegen. Wenn Sie nicht der beabsichtigte Adressat sind, 
> informieren Sie
>
> bitte den Absender unverzueglich, loeschen Sie alle Kopien von Ihrem System 
> und
>
> veroeffentlichen Sie oder nutzen Sie die Information keinesfalls, gleich zu 
> welchem Zweck.
>
>
>
> Think before you print!
>
>
>
> T-Mobile Austria GmbH
>
> Geschaeftsfuehrung: Dr. Andreas Bierwirth (Vorsitzender), Aufsichtsrat: Dr. 
> Rolf Nafziger (Vorsitzender)
>
> Firmenbuch: Handelsgericht Wien, Sitz Wien, FN 171112k, UID ATU 45011703, DVR 
> 0898295
>
> Konto: UniCredit Bank Austria AG IBAN: AT93 1200 0528 4407 2301, BIC: BKAUATWW
>
>
>
> T-Mobile - Das verbindet uns.
>
> ___
>
>
>
> ___
> Notice: This e-mail and any attachments are confidential and may be 
> privileged.
> If you are not the intended recipient, notify the sender immediately, destroy 
> all
> copies from your system and do not disclose or use the information for any 
> purpose.
> Diese E-Mail inklusive aller Anhaenge ist vertraulich und koennte 
> bevorrechtigtem
> Schutz unterliegen. Wenn Sie nicht der beabsichtigte Adressat sind, 
> informieren Sie
> bitte den Absender unverzueglich, loeschen Sie alle Kopien von Ihrem System 
> und
> veroeffentlichen Sie oder nutzen Sie die Information keinesfalls, gleich zu 
> welchem Zweck.
>
> Think before you print!
>
> T-Mobile Austria GmbH
> Geschaeftsfuehrung: Dr. Andreas Bierwirth (Vorsitzender), Aufsichtsrat: Dr. 
> Rolf Nafziger (Vorsitzender)
> Firmenbuch: Handelsgericht Wien, Sitz Wien, FN 171112k, UID ATU 45011703, DVR 
> 0898295
> Konto: UniCredit Bank Austria AG IBAN: AT93 1200 0528 4407 2301, BIC: BKAUATWW
>
> T-Mobile - 

Re: MergeRecord

2018-04-13 Thread Koji Kawamura
Hi,

Just FYI,
If I replaces the schema doc comment by UpdateAttribute, I was able to
merge records.
${inferred.avro.schema:replaceAll('"Type inferred from [^"]+"', '""')}

I looked at InferAvroSchema and underlying Kite source code, but
there's no option to suppress the doc comment when inferring schema
unfortunately.

Thanks,
Koji

On Fri, Apr 13, 2018 at 4:11 PM, Koji Kawamura <ijokaruma...@gmail.com> wrote:
> Hi,
>
> I've tested InferAvroSchema and MergeRecord scenario.
> As you described, records are not merged as expected.
>
> The reason in my case is, InferAvroSchema generates schema text like this:
> inferred.avro.schema
> { "type" : "record", "name" : "example", "doc" : "Schema generated by
> Kite", "fields" : [ { "name" : "Key", "type" : "long", "doc" : "Type
> inferred from '4'" }, { "name" : "Value", "type" : "string", "doc" :
> "Type inferred from 'four'" } ] }
>
> And, MergedRecord uses that schema text as groupId even if
> 'Correlation Attribute' is specified.
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java#L348
>
> So, even if schema is the same, if actual values vary, merging group
> id will be different.
> If you can use SchemaRegistry, it should work as expected.
>
> Thanks,
> Koji
>
> On Fri, Apr 13, 2018 at 2:45 PM, DEHAY Aurelien
> <aurelien.de...@faurecia.com> wrote:
>>
>> Hello.
>>
>> Thanks for the answer.
>>
>> The 20k is just the last test, I’ve tested with 100,1000, with an input 
>> queue of 10k, and it doesn’t change anything.
>>
>> I will try to simplify the test case and to not use the inferred schema.
>>
>> Regards
>>
>>> Le 13 avr. 2018 à 04:50, Koji Kawamura <ijokaruma...@gmail.com> a écrit :
>>>
>>> Hello,
>>>
>>> I checked your template. Haven't run the flow since I don't have
>>> sample input XML files.
>>> However, when I looked at the MergeRecord processor configuration, I found 
>>> that:
>>> Minimum Number of Records = 2
>>> Max Bin Age = 10 sec
>>>
>>> By briefly looked at MergeRecord source code, it expires a bin that is
>>> not complete after Max Bin Age.
>>> Do you have 20,000 records to merge always within 10 sec window?
>>> If not, I recommend to lower the minimum number of records.
>>>
>>> I haven't checked actual MergeRecord behavior so I may be wrong, but
>>> worth to change the configuration.
>>>
>>> Hope this helps,
>>> Koji
>>>
>>>
>>> On Fri, Apr 13, 2018 at 12:26 AM, DEHAY Aurelien
>>> <aurelien.de...@faurecia.com> wrote:
>>>> Hello.
>>>>
>>>> Please see the template attached. The problem we have is that, however any 
>>>> configuration we can set in the mergerecord, we can't manage it to 
>>>> actually merge record.
>>>>
>>>> All the record are the same format, we put an inferschema not to have to 
>>>> write it down ourselves. The only differences between schemas is then that 
>>>> the doc="" field are different. Is it possible for it to prevent the 
>>>> merging?
>>>>
>>>> Thanks for any pointer or info.
>>>>
>>>>
>>>> Aurélien DEHAY
>>>>
>>>>
>>>>
>>>> This electronic transmission (and any attachments thereto) is intended 
>>>> solely for the use of the addressee(s). It may contain confidential or 
>>>> legally privileged information. If you are not the intended recipient of 
>>>> this message, you must delete it immediately and notify the sender. Any 
>>>> unauthorized use or disclosure of this message is strictly prohibited.  
>>>> Faurecia does not guarantee the integrity of this transmission and shall 
>>>> therefore never be liable if the message is altered or falsified nor for 
>>>> any virus, interception or damage to your system.
>>
>> This electronic transmission (and any attachments thereto) is intended 
>> solely for the use of the addressee(s). It may contain confidential or 
>> legally privileged information. If you are not the intended recipient of 
>> this message, you must delete it immediately and notify the sender. Any 
>> unauthorized use or disclosure of this message is strictly prohibited.  
>> Faurecia does not guarantee the integrity of this transmission and shall 
>> therefore never be liable if the message is altered or falsified nor for any 
>> virus, interception or damage to your system.
>>


Re: MergeRecord

2018-04-13 Thread Koji Kawamura
Hi,

I've tested InferAvroSchema and MergeRecord scenario.
As you described, records are not merged as expected.

The reason in my case is, InferAvroSchema generates schema text like this:
inferred.avro.schema
{ "type" : "record", "name" : "example", "doc" : "Schema generated by
Kite", "fields" : [ { "name" : "Key", "type" : "long", "doc" : "Type
inferred from '4'" }, { "name" : "Value", "type" : "string", "doc" :
"Type inferred from 'four'" } ] }

And, MergedRecord uses that schema text as groupId even if
'Correlation Attribute' is specified.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java#L348

So, even if schema is the same, if actual values vary, merging group
id will be different.
If you can use SchemaRegistry, it should work as expected.

Thanks,
Koji

On Fri, Apr 13, 2018 at 2:45 PM, DEHAY Aurelien
<aurelien.de...@faurecia.com> wrote:
>
> Hello.
>
> Thanks for the answer.
>
> The 20k is just the last test, I’ve tested with 100,1000, with an input queue 
> of 10k, and it doesn’t change anything.
>
> I will try to simplify the test case and to not use the inferred schema.
>
> Regards
>
>> Le 13 avr. 2018 à 04:50, Koji Kawamura <ijokaruma...@gmail.com> a écrit :
>>
>> Hello,
>>
>> I checked your template. Haven't run the flow since I don't have
>> sample input XML files.
>> However, when I looked at the MergeRecord processor configuration, I found 
>> that:
>> Minimum Number of Records = 2
>> Max Bin Age = 10 sec
>>
>> By briefly looked at MergeRecord source code, it expires a bin that is
>> not complete after Max Bin Age.
>> Do you have 20,000 records to merge always within 10 sec window?
>> If not, I recommend to lower the minimum number of records.
>>
>> I haven't checked actual MergeRecord behavior so I may be wrong, but
>> worth to change the configuration.
>>
>> Hope this helps,
>> Koji
>>
>>
>> On Fri, Apr 13, 2018 at 12:26 AM, DEHAY Aurelien
>> <aurelien.de...@faurecia.com> wrote:
>>> Hello.
>>>
>>> Please see the template attached. The problem we have is that, however any 
>>> configuration we can set in the mergerecord, we can't manage it to actually 
>>> merge record.
>>>
>>> All the record are the same format, we put an inferschema not to have to 
>>> write it down ourselves. The only differences between schemas is then that 
>>> the doc="" field are different. Is it possible for it to prevent the 
>>> merging?
>>>
>>> Thanks for any pointer or info.
>>>
>>>
>>> Aurélien DEHAY
>>>
>>>
>>>
>>> This electronic transmission (and any attachments thereto) is intended 
>>> solely for the use of the addressee(s). It may contain confidential or 
>>> legally privileged information. If you are not the intended recipient of 
>>> this message, you must delete it immediately and notify the sender. Any 
>>> unauthorized use or disclosure of this message is strictly prohibited.  
>>> Faurecia does not guarantee the integrity of this transmission and shall 
>>> therefore never be liable if the message is altered or falsified nor for 
>>> any virus, interception or damage to your system.
>
> This electronic transmission (and any attachments thereto) is intended solely 
> for the use of the addressee(s). It may contain confidential or legally 
> privileged information. If you are not the intended recipient of this 
> message, you must delete it immediately and notify the sender. Any 
> unauthorized use or disclosure of this message is strictly prohibited.  
> Faurecia does not guarantee the integrity of this transmission and shall 
> therefore never be liable if the message is altered or falsified nor for any 
> virus, interception or damage to your system.
>


Re: Fine-grained control over when a NiFi processor can run

2018-04-13 Thread Koji Kawamura
Hello Tim,

If I'd understand your requirement correctly, I'd first try using
following flow using the existing processors:

FetchDistributedMapCache
-> RouteOnAttribute
  --  permit --> InvokeHTTP
  --  unmatched --> WhateverAlternativeRoute

Assuming input FlowFiles have a timestamp denoting when the request is
initiated.
First, FetchDistributeMapCache or other Fetch processors can be
used to retrieve available time window for making requests against the
target HTTP service.
E.g. by a epoch long timestamp to represent end timestamp.
Then RouteOnAttribute can be used to route input FlowFiles by EL
something like ${requestTimestamp:lt(${permitRequestUntil})} to the
'permit' relationship.

This way, you can control service availability outside of this part.
By updating the time schedule using a corresponding processor, e.g.
PutDistributedMapCache.

The concern for FlowFiles those passed RouteOnAttribute, but being
queued in front of InvokeHTTP in above example, how to limit such
FlowFile to enter such state?
I would configure the 'permit' connection Back Pressure Object
Threshold as 1 (or grater but keep low based on InvokeHTTP
throughput).
By doing so, RouteOnAttribute will not be scheduled until the queued
FlowFile gets processed, thus no more FlowFiles can proceed.

Hope this helps,
Koji


On Fri, Apr 13, 2018 at 1:20 PM, Tim Dean  wrote:
> Hello,
>
> I have a custom NiFi processor that invokes an external HTTP endpoint. That
> endpoint will be hosted by services running at customer sites, and those
> customer sites require the ability to define when the service can be called
> by my processor. Their goal is to prevent calls from coming in during their
> peak hours so that they only have to process my requests during a
> configurable set of off-peak hours.
>
> Additionally, we have a goal of keeping the code for making the HTTP request
> separate from the code for checking whether or not it is currently in a time
> window that requests are allowed. This is not a strict requirement, but we
> have many different scenarios that would like to use the HTTP request
> processor without any schedule restrictions and still other scenarios that
> would like to check schedule restrictions before running other processors.
>
> My first idea for this was to implement 2 different custom processors, one
> to make the HTTP request and another to check the current time against the
> configured schedule restrictions. Flow files would first enter the schedule
> restriction processor, and transfer to a “success” relationship only if the
> request is currently permitted against the schedule. That success
> relationship would then be connected to the HTTP request processor.
>
> The potential problem I see with this is that flow files could back up for
> some reason between the schedule restriction check processor and the HTTP
> requests. So a flow file could pass the schedule restriction check, wait for
> a while until the HTTP request processor picks up the work, and then end up
> sending an HTTP request outside of the permitted schedule window.
>
> I could avoid this problem completely by combining the logic into a single
> processor, but that makes it more difficult to reuse these processors in
> different combinations for the other scenarios mentioned above.
>
> I’m looking for other options to consider for addressing this workflow. I
> have a couple of thoughts:
>
> Implement the HTTP processor independently and then a second processor that
> subclasses the first to add schedule restrictions. This keeps the two bits
> of code separate but doesn’t give as much flexibility as I’d like
> Just implement this as 2 separate processors and try to figure out some way
> in the flow to prevent flow files from backing up between these 2 processors
> (not sure if this is possible)
> Implement the schedule restriction as a particular implementation of a
> controller service interface, and have the HTTP request processor depend on
> an instance of that controller. Alternate implementations of that controller
> service interface could be created that exclude the schedule restriction
> check.
>
>
> Any thoughts on these approaches? Do any alternatives come to mind that I am
> missing?
>
> Thanks in advance
>
> -Tim


Re: MergeRecord

2018-04-12 Thread Koji Kawamura
Hello,

I checked your template. Haven't run the flow since I don't have
sample input XML files.
However, when I looked at the MergeRecord processor configuration, I found that:
Minimum Number of Records = 2
Max Bin Age = 10 sec

By briefly looked at MergeRecord source code, it expires a bin that is
not complete after Max Bin Age.
Do you have 20,000 records to merge always within 10 sec window?
If not, I recommend to lower the minimum number of records.

I haven't checked actual MergeRecord behavior so I may be wrong, but
worth to change the configuration.

Hope this helps,
Koji


On Fri, Apr 13, 2018 at 12:26 AM, DEHAY Aurelien
 wrote:
> Hello.
>
> Please see the template attached. The problem we have is that, however any 
> configuration we can set in the mergerecord, we can't manage it to actually 
> merge record.
>
> All the record are the same format, we put an inferschema not to have to 
> write it down ourselves. The only differences between schemas is then that 
> the doc="" field are different. Is it possible for it to prevent the merging?
>
> Thanks for any pointer or info.
>
>
> Aurélien DEHAY
>
>
>
> This electronic transmission (and any attachments thereto) is intended solely 
> for the use of the addressee(s). It may contain confidential or legally 
> privileged information. If you are not the intended recipient of this 
> message, you must delete it immediately and notify the sender. Any 
> unauthorized use or disclosure of this message is strictly prohibited.  
> Faurecia does not guarantee the integrity of this transmission and shall 
> therefore never be liable if the message is altered or falsified nor for any 
> virus, interception or damage to your system.


Re: Multi Domains Nifi connection and UI acces

2018-04-12 Thread Koji Kawamura
Hello,

NiFi 1.6.0 has been released and it adds new nifi.property to
whitelist multiple hosts so that NiFi can be accessed by different
hostnames.
Please see NIFI-4761 for details. I recommend updating to 1.6.0.
https://issues.apache.org/jira/browse/NIFI-4761

Thanks,
Koji

On Wed, Apr 11, 2018 at 4:07 PM, Abdou B  wrote:
>
> Hello,
>
>  I use Nifi 1.5 and I would like to use the functionnality that enables Nifi
> to use various network interface.
> So as stated in the offficial documentation, i created the following
> properties
>
> nifi.web.https.network.interface.default=0.0.0.0
>
> nifi.web.https.network.interface.eth0=eth0
> nifi.web.https.network.interface.eth1=eth1
> nifi.web.https.network.interface.eth2=eth2
> nifi.web.https.network.interface.eth3=eth3
> nifi.web.https.network.interface.eth4=eth4
>
> I use a certificate with SAN, the SAN includes : FQDN eth0 +  FQDN eth1 +
> FQDN eth2  + FQDN eth3 +  FQDN eth4
>
> I have to use eth1 to connect to the UI
> so i set up  :
> nifi.web.https.host=the FQDN of eth0
> but I have an error stating that
> "Cluster is still in the process of voting on the appropriate Data Flow. "
>
> if I use the  eth0, I can't connect anymore (I got a message about header
> not allowed) but the cluster seems to work.
> I saw that jetty seems to be more restrictive than before and allow only
> host that use the  nifi.web.https.host to connect to UI
>
>
> would you have any advice ?
>
> Best Regards
>


Re: PutHiveQL (1.5.0) throws un-necessary NullPointerException when parsing the query

2018-03-12 Thread Koji Kawamura
(dropped dev)

Hi Amit,

Thanks for sharing that.

The parsing query is added by NIFI-4545 in NiFi 1.5.0, to parse Hive
query at Hive related processors so that it can extract table names
into outgoing FlowFile attributes. Those attributes are used by
ReportLineageToAtlas reporting task to report Atlas that which
processor interacts with which DataSet.
https://issues.apache.org/jira/browse/NIFI-4545

Parsing query at NiFi processor is not necessary for executing the
query itself. So it just logs the exception while the query completes.

Do you have a complete stacktrace for the NullPointerException? Also,
do you mind filing a JIRA?

Thanks,
Koji




On Fri, Mar 9, 2018 at 11:27 AM, Ranjan, Amit | Amy | DPD
 wrote:
> Hi Team,
>
> While working with NiFi 1.5.0, we observed that for few queries, PutHiveQL
> throws NullPointerException while parsing the query. However, the query does
> complete the execution and flow file is routes to success afterwards.
>
> From the observation made till now, we found that the queries containing
> back-tick(s) (`) are not getting parsed. This was not the case in NiFi
> 1.4.0. Will it be possible for you to confirm whether this is the desired
> behavior of NiFi 1.5.0.
>
> I’ve attached a file which contains a portion of the nifi-app.log showing
> the WARN messages of parsing exception.
>
> Regards,
> Amit


Re: Create nested records

2018-02-13 Thread Koji Kawamura
Hi Charlie,

Thanks for sharing the template.
Following configurations for UpdateRecord did the flat to nested mapping:

- Replacement Value Strategy: Record Path Value
- Dynamic property: "/phone" = "/"

It maps the flat record into /phone child record.
Fields those are not included in the outgoing schema will be discarded.
But it would work only if nested fields have the same name.

Example result:
[ {
  "person" : "john",
  "gender" : "m",
  "phone" : {
"phone1" : "123-456-7890",
"phone1type" : "mobile",
"phone2" : "234-567-8901",
"phone2type" : "home"
  }
} ]

If the mapping gets more complicated than that, then I'd go with
ConvertRecord -> JoltTransformJSON.

Hope this helps,
Koji

On Sat, Feb 10, 2018 at 1:31 AM, Charlie Frasure
 wrote:
> Hi,
>
> I'm including a template here that mimics the problem I'm having.  I
> generate a one record csv and try to convert it to a JSON file with a nested
> section for phones.  The phone section of the JSON file converts as null
> instead of having the phone attributes.
> If we can resolve this problem, I would also like to map phone1 and phone2
> attributes to phone[0] ... phone[n].
>
> Can this be completed with the record writers?
>
> On Thu, Feb 8, 2018 at 10:25 PM, Charlie Frasure 
> wrote:
>>
>> Hi,
>>
>> I'm having trouble taking a flat record and building an avro schema that
>> lets me nest it.
>> I found this example schema, but after using a ConvertRecord or
>> UpdateRecord, I receive the equivalent of [{"parent" = null}]
>>
>> {"type": "record", "name": "CustomerInput", "namespace":
>> "org.apache.example", "fields": [
>> {"name": "id", "type": "string"},
>> {"name": "companyName", "type": ["null", "string"], "default":
>> null},
>> {"name": "revenue", "type": ["null", "string"], "default": null},
>> {"name" : "parent", "type" : [ "null",
>> {"type" : "record", "name" : "parent", "fields" : [
>> {"name" : "name", "type" : ["null", "string"], "default" :
>> null},
>> {"name" : "id", "type" : "string"}
>> ] }
>> ], "default" : null }
>> ]}
>>
>> This example found online is meant to union a person schema with an
>> address schema, creating an "Address" type, but this resulted in a "Could
>> not parse incoming data" error.
>>
>> [
>>   {"type": "record", "name": "Address", "fields": [
>>   {"name": "streetaddress", "type": "string"},
>>   {"name": "city", "type": "string"}
>>   ]},
>>   {"type": "record", "name": "person", "fields": [
>>   {"name": "firstname","type": "string"},
>>   {"name": "lastname", "type": "string"},
>>   {"name": "address", "type": "Address"}
>>   ]}
>> ]
>>
>> I would also like to be able to include multiples of these sub-records as
>> well, such that I could have 0 to n addresses with distinct address types
>> associated to one person.
>> Is this possible with the record processors, or do I need to focus on Jolt
>> / ConvertAvro?
>>
>> Thanks,
>> Charlie
>>
>>
>


Re: Secure NiFi 1.5 Behind NGINX/HAProxy

2018-02-07 Thread Koji Kawamura
Hi Ryan,

Although I am not sure why you'd want to use http between the clients
and Nginx, I was able to setup similar environment.
I used LDAP provider instead of OpenID, but OpenID should work as well.
The key is NOT provide any client certificate from clients
(browser/API) and Nginx to NiFi, so that NiFi will ask username and
password.

I wrote a Gist entry including Nginx configuration. I hope it will be
helpful for you.
https://gist.github.com/ijokarumawak/d14e5b28a16d363d6c001a92b7e73fe4

Thanks,
Koji

On Thu, Feb 8, 2018 at 6:55 AM, Ryan H
 wrote:
> Hi All,
>
> This may trivial, but I'm asking anyways for clarity. I am setting up a
> secure instance of NiFi behind NGINX for reverse proxy capabilities. I have
> a certain requirement that traffic coming in will hit NGINX as HTTP on port
> 80. NGINX will need to forward the request to the secure instance as HTTPS
> on port 8443.
>
> So: browser/API -> http -> NGINX -> https -> Secure NiFi
>
> Currently I am using the tls-toolkit in client/server mode for the secure
> instance to get its certs. I plan to have an OpenID provider configured for
> AuthN.
>
> From what I understand I will need to place the client key and certificate
> as well as server key and certificate on NGINX. This may be a bad
> assumption, but it's where I'm at, at this point.
>
> My question is: what would act as each of the key/certificates for both the
> client/server to be placed on NGINX based on what is generated from the
> tls-toolkit (which keys/certs would be extracted from each of the generated
> files/stores)? Is what I'm doing feasible (I'm assuming it is, but open to
> being wrong). I've tried a few different extractions from the keystore and
> truststore, but this is a weaker area of expertise for me and would rather
> be clear on what I'm doing.
>
> Any help is greatly appreciated.
>
> Cheers,
>
> Ryan H


Re: Is it possible to join multiple columns to a record using single lookup service

2018-01-30 Thread Koji Kawamura
Hi Sangavi,

Good question, I thought it can be a nice example to illustrate how to
use LookupService.
I wrote a simple Gist entry with a NiFi template file to do what you
are looking for.

"Join, Enrich multiple columns by looking up an external CSV file"
https://gist.github.com/ijokarumawak/b9c95a0d0c86c97ffeaeb5ef95320b8b

I hope you find it helpful.

Thanks,
Koji

On Tue, Jan 30, 2018 at 9:43 PM, Sangavi Eswaramoorthi
 wrote:
> Hi,
>
> I would like to join multiple columns to input record using single lookup
> service.Is it possible in LookupRecord processor??
>
> For Example,
>
> I have an input csv file with columns.
>
> File 1:
>
> Emp_Id,Name,Address,Mobile No
>
> And another file which is used as input for lookup service,
>
> File 2:
>
> Emp_Id,Salary,Department
>
> I am trying to join salary and department to file 1.
>
> Expected result:
>
> Emp_Id,Name,Address,Mobile No, Salary, Department
>
> Could anyone please help me to achieve this using LookupRecord processor.
>
> Thanks in advance.
>
>
> Regards,
> Sangavi E


Re: adding a filename column to a csv to insert into a table

2018-01-30 Thread Koji Kawamura
Hi Austin,

I think there are a couple of ways to do that:
1. UpdateRecord with CSVReader and CSVWriter, update a column with a
Record Path and Expression Language, e.g. Add a dynamic property,
key=/filename, value=${filename}
2. Use SpritText to sprit each CSV record into a FlowFile, then
combine ReplaceText to prepend, append or replace text to insert
filename, you can use Expression Language here, too

I recommend #1 above as it performs better than #2. It may take time
for you to get familiar with Record, Record path, Reader, Writer,
Schema ... etc, if you haven't played with those yet, but once you
know how it woks, you can apply the Record processing at various
processors.

Hope this helps,
Koji


On Wed, Jan 31, 2018 at 6:44 AM, Austin Duncan  wrote:
> all,
> I am trying to take a csv, add a column to it that contains the filename of
> the csv, and then insert that record into postgres using the
> putdatabaserecord processor. Any idea what would be the best way to go about
> doing this?
>
> --
> Austin Duncan
> Researcher
>
> PYA Analytics
> 2220 Sutherland Avenue
> Knoxville, TN 37919
> 423-260-4172


Re: all of our schedule tasks not running/being scheduled....

2018-01-29 Thread Koji Kawamura
Hi Dan,

If all available Timer Driven Thread are being used (or hang
unexpectedly for some reason), then no processor can be scheduled.
The number at the left top the NiFi UI under the NiFi logo shows the
number of threads currently working.
If you see something more than 0, then I'd recommend to take some
thread dumps to figure out what running thread is doing.

Other than that, I've encountered unexpected behavior with a NiFi
cluster if a node encountered OutOfMemory error.
The cluster started to behave incorrectly as it can not replicate REST
requests among nodes. I'd search any ERR logs in nifi-app.log.

Thanks,
Koji

On Tue, Jan 30, 2018 at 1:10 PM, dan young  wrote:
> Hello,
>
> We're running a secure 3 node 1.4 cluster.  Has anyone seen any behaviour
> where the cluster just stops scheduling the running of flowfiles/tasks?
> i.e. cron/timer, just don't run when they're suppose to.  I've tried to stop
> and restart a processor that is say set to run ever 900sec, but nothing
> happens.  Then only thing I can do is to cycle through restarting each node
> in the cluster and then we're good for a few daysthis is something that
> just started happening and has occurred twice in the last week or so.
> Anything I should keep an eye out for or look for in the logs?
>
> Regards,
>
> Dan


Re: Updating schedule information using REST API without disturbing other processor configuration

2018-01-28 Thread Koji Kawamura
Hi Ravi,

How does your request JSON sent to the PUT /processors/{id} endpoint look like?
If you don't need to update any processor properties, then you don't
have to send /component/config/properties element in a request JSON.
You can debug how NiFi UI sends REST requests using web browser
Developer tools [1].

[1] https://developers.google.com/web/tools/chrome-devtools/

Thanks,
Koji

On Sat, Jan 27, 2018 at 7:41 AM, Ravi Papisetti (rpapiset)
 wrote:
> Hi,
>
> We are trying to update "Run Schedule" and "Scheduling Strategy" of a NiFi
> Processor using REST API (PUT /processors/{id}). This works for most of the
> processors, but when it is updated for GetSFTP, it overwrites password field
> that results authfail exception. If I update scheduling information using
> NiFi UI, it works fine.
>
> Any thought how we can update scheduling information without bothering about
> other processor configuration.
>
> Appreciate any help.
>
> Thanks,
>
> Ravi Pipisetti
>
>


Re: Maximum-value Columns on QueryDatabaseTable

2018-01-22 Thread Koji Kawamura
Hi Alberto,

Thanks for reporting the issue, I was able to reproduce the behavior
you described.
Although it's for Microsoft SQL Server, there has been an existing
JIRA for the same issue, NIFI-4393.
https://issues.apache.org/jira/browse/NIFI-4393

I've created a Pull Request to fix MS SQL square brackets and MySQL
back-ticks as well as generic escape with double quotes.

I hope the fix to be merged and available soon.

Thanks,
Koji

On Fri, Jan 5, 2018 at 5:58 AM, Alberto Bengoa  wrote:
> Hello Folks,
>
> Not sure if I'm running on a bug, but I'm facing a situation when I try to
> use a "not compliance" column name as my Maximum-value Column.
>
> First, I've tried to use a column named _Time-Stamp (underscore at the
> beginning + hyphen on the middle). This column creates a state like this:
>
> "man_fabrica-cdc"@!@_time-stamp 2018-01-04 15:58:07.877 Cluster
>
> I was wondering if wouldn't QueryDatabaseTable works with Timestamp fields
> as Maximum-value Column. So, I've changed to another column to make a try
> (column name _Change-Sequence), and got this state:
>
> "man_fabrica-cdc"@!@_change-sequence 252254 Cluster
>
> Enabling Nifi debug I see that no "WHERE" clause was passed when
> Maximum-value Column is filled with quotes ("_my-strange-column-name"). On
> the other hand, if I do not wrap the odd column name with quotes I got an
> error message like this from JDBC Driver:
>
> nifi-app_2017-12-12_11.0.log:Caused by: java.sql.SQLException:
> [DataDirect][OpenEdge JDBC Driver][OpenEdge] Syntax error in SQL statement
> at or about "_Time-Stamp FROM "man_fabrica-cdc" WHERE" (10713)
>
> I'm using "Normalize Table/Column Names" as suggested here [1].
>
> [1] -
> http://apache-nifi-users-list.2361937.n4.nabble.com/Hyphenated-Tables-and-Columns-names-td3650.html#a3655
>
> Thanks!
>
> Alberto


Re: Get the failure reason from ValidateResult processor

2018-01-09 Thread Koji Kawamura
Hi Martin,

I assume you wanted to ask about ValidateRecord.

As you know, ValidateRecord processor emits ROUTE provenance events
with 'Details' that explains validation error.
E.g. "Records in this FlowFile were invalid for the following reasons:
; The following 1 fields were present in the Record but not in the
schema: [/extra]"
Those provenance events can be integrated by your flow if you use
SiteToSiteProvenanceReportingTask.
Provenance events can be ingested in JSON representation into your
flow via the reporting task through an Input Port via SiteToSite
protocol.

However, that approach may be challenging as the JSON provenance
events are not tied to the original FlowFiles caused the validation
errors.

Possible improvement would be adding ValidateRecord an option to embed
validation errors into the outgoing Records to the 'invalid'
relationship.
That requires user to specify RecordWriter with a schema having one
extra field to embed the validation detail.
Then you can process the 'invalid' records further since each invalid
record has validation error detail in it.

Does it sound useful in your use case? If so, please file a JIRA for
improvement.
If it suites your needs and you can contribute the improvement, that's
even better.
https://issues.apache.org/jira/projects/NIFI

Hope this helps.

Thanks,
Koji

On Fri, Jan 5, 2018 at 12:12 AM, Martin Mucha  wrote:
> Hi,
>
> I'm having difficulties to find a way how to get the failure reason from
> ValidateResult processor. I know, it emits the route provenance event, but
> you cannot integrate that in your flow as far as I know. I also saw
> suggestion to write custom reporting task, but a) that one does not have
> access to original FlowFile and generally it does not seem to be sound
> solution.
>
> On the otherhand ConvertToAvro validator creates new `Details` attribute to
> describe failure. (well at least it used to do so? I cannot find that
> Details property in documentation any more).
>
> am I overlooking something? What I'd like to do, is to work with failure
> reason further in flow, where I get along `invalid` relationship.
>
> Thanks for advice.
>
> Martin.


Re: ListS3 and FetchS3

2018-01-09 Thread Koji Kawamura
Hi Aruna,

The resulted two FlowFiles have the same contents I guess, that is the PDF
file you specified at FetchS3Object Object Key.
The flow worked as follows actually:
1. ListS3 listed two FlowFiles, Ntl_15.csv and 11500509.pdf
2. FetchS3Object is executed twice for each incoming FlowFile
2-1. FetchS3Object fetched 11500509.pdf from S3 for the incoming FlowFile
whose 'finename' attribute is Ntl_15.csv
2-2. FetchS3Object fetched 11500509.pdf from S3 for the incoming FlowFile
whose 'finename' attribute is 11500509.pdf
3. Listing two FlowFiles in the 'success' relationship showed two FlowFiles
as if csv and pdf are fetched, but both are the same pdf actually.

I recommend to use RouteOnAttribute between ListS3 and FetchS3Object to
filter FlowFiles by 'filename' attribute.

Hope this helps,
Koji

On Thu, Jan 4, 2018 at 5:27 AM, Aruna Sankaralingam <
aruna.sankaralin...@cormac-corp.com> wrote:

> I have 2 files in S3 – one is CSV and other is pdf. I want to fetch the
> pdf and load into Elastic Search. Even though I give the object key as the
> pdf file name, it is still taking all the files from S3. How do I make sure
> it takes only the file that is needed?
>
>
>
>
>
>
>
>


Re: Replay Event UUID

2018-01-09 Thread Koji Kawamura
Hi Rotem,

You are correct, the WriteAheadProvenanceRepository returns provenance
event FlowFile UUID value differently than the default
PersistentProvenanceRepository.
Here is the lines of code that set FlowFile UUID for the provenance events.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java#L276-L280

WriteAheadProvenanceRepository does not seem to have 'FlowFile UUID'
value persisted, which is set by FlowController when replay events are
registered. Instead, WriteAheadProvenanceRepository fill 'FlowFile
UUID' from updated or previous 'UUID' attribute.
I don't know much background on why it is implemented this way, but it
seems it drops 'FlowFile UUID' to reduce IO based on an assumption
that it can be set by attributes.

Thanks again for reporting this. I filed NIFI-4752.
https://issues.apache.org/jira/browse/NIFI-4752

Koji

On Thu, Dec 28, 2017 at 5:33 PM, Rotem Mayo <rotem.m...@gmail.com> wrote:
> Hi,
>
> We had another server that was not experiencing this bug, but when we
> changed  to the WriteAheadProvenanceRepository (like on the server where we
> first noticed this) we did see it. Have there been changes in that
> implementation? Could this cause what we are seeing?
>
> Thanks!
> Rotem
>
>
> On 28 Dec 2017 3:35 am, "Koji Kawamura" <ijokaruma...@gmail.com> wrote:
>
> Hi Rotem,
>
> When I tested it with NiFi 1.5.0-SNAPSHOT, the REPLAY event has its
> FlowFile UUID as the parent (original) FlowFile UUID as expected.
>
> Type
> REPLAY
> FlowFile Uuid
> 8c61fdd7-c084-4756-946c-f5669dc4442d
> File Size
> 4 bytes
> Component Id
> 9abc21e3-0160-1000-6d6f-a1c408f75b7a
> Component Name
> UpdateAttribute
> Component Type
> UpdateAttribute
>
> Parent FlowFiles (1)
> 8c61fdd7-c084-4756-946c-f5669dc4442d
> Child FlowFiles (1)
> 0428b60c-44cb-46df-85ab-3787a38693e9
>
> The implementation has not been modified. It's been using parentUUID.
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java#L4241
>
> Is this different than what you are seeing?
>
> Thanks,
> Koji
>
> On Thu, Dec 28, 2017 at 2:46 AM, Rotem Mayo <rotem.m...@gmail.com> wrote:
>> Hi!
>>
>> I have been using nifi for a while, and have recently upgraded from 1.2.0
>> to
>> 1.4.0.
>>
>> I am having problems with replay events. In version 1.2.0, the UUID of a
>> replay event is the id of the original flowfile that is being replayed.
>> This
>> is the behavior that is documented in the developer guid (
>>
>> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#provenance_events).
>>
>> But, in nifi 1.4.0, the replay event's UUID is that of the child flowfile
>> that is created. This change is not documented in the changes log, and I
>> could not find any ticket related to the subject. In addition, other
>> events
>> that create child flowfiles, like clone and fork, still behave according
>> to
>> the documented way.
>>
>> Was this change done on purpose? Will other similar events change in the
>> future?
>>
>> Thanks!
>> Rotem
>
>
>


Re: Bug/Unexpected behavior in ConvertJSONToSQL for boolean attributes

2018-01-02 Thread Koji Kawamura
Submitted a PR to fix the issue.
https://issues.apache.org/jira/browse/NIFI-4729
https://github.com/apache/nifi/pull/2366

I confirmed that JSON Boolean values can be converted
(ConvertJSONToSQL, true -> 1, false -> 0), inserted (PutSQL) then
retrieved back into JSON object correctly (ExecuteSQL and
ConvertRecord 1 -> true, 0 -> false) using following NiFi flow.
https://gist.github.com/ijokarumawak/5b8d7dd5d799764dfd13dc6195025785

I hope this to get merged soon and available in the next release.

Thanks,
Koji

On Wed, Jan 3, 2018 at 8:13 AM, Koji Kawamura <ijokaruma...@gmail.com> wrote:
> Hi Jennifer,
>
> Thank you very much for reporting this. It seems the line converts a
> Boolean to "0" or "1" at ConvertJSONToSQL is implemented wrongly.
> Looks like a careless mistake. Sorry for the inconvenience.
> https://github.com/apache/nifi/commit/8acee02393f9557b9679038b933ba49705984cf8#diff-d99a1a0b78bf07b01f45c26059fc0d67R533
>
> I will submit a JIRA and a PR to fix this.
>
> Thanks,
> Koji
>
> On Wed, Jan 3, 2018 at 6:31 AM, Jennifer Kissinger
> <jennifer.kissin...@semanticbits.com> wrote:
>> Good morning,
>>
>> We've uncovered a bug between 1.3.0 and 1.4.0 in the way ConvertJSONToSQL
>> handles booleans, which is flipping true to false and vice versa in our
>> data.
>>
>> Our input to ConvertJSONToSQL is a JSON array that contains true/false
>> values, like this:
>>
>> [
>>{
>>   "foo": true
>>},
>>{
>>   "foo": false
>>}
>> ...
>> ]
>>
>> After splitting the array and converting to sql, the output of
>> ConvertJSONToSQL feeds into PutSQL, which saves the data to a Postgres RDS
>> database.
>>
>> In 1.3.0, the processor outputs attributes from booleans like this:
>> ```
>> sql.args.N.type
>> -7
>> sql.args.N.value
>> t
>> ```
>>
>> But in 1.4.0, the boolean attributes become:
>>
>> ```
>> sql.args.N.type
>> -7
>> sql.args.N.value
>> 0
>> ```
>>
>> In both cases, the value above was converted from 'true'. When PutSQL
>> executes the sql command, it interprets '0' as false--effectively flipping
>> our boolean values when saved to the database.
>>
>> It looks like this is a collision between
>> https://issues.apache.org/jira/browse/NIFI-1613 and
>> https://issues.apache.org/jira/browse/NIFI-3372--the latter associates "1"
>> and "t" with true in the PutSQL processor, while the former associates "0"
>> with true in the ConvertJSON processor (see PR:
>> https://github.com/apache/nifi/commit/8acee02393f9557b9679038b933ba49705984cf8#diff-d99a1a0b78bf07b01f45c26059fc0d67R533).
>> Personally I would think "1" would be true and "0" would be false, but it
>> doesn't matter, as long as it's consistent.
>>
>> We're rolling back to 1.3.0, but there is functionality in 1.4.0 I would
>> love to have--any chance of a patch?
>>
>> Thanks,
>>
>> ~Jenni
>>
>> --
>> Jennifer Kissinger
>> Senior Data Engineer
>> SemanticBits, LLC
>> jennifer.kissin...@semanticbits.com
>> 603-290-1711


Re: Bug/Unexpected behavior in ConvertJSONToSQL for boolean attributes

2018-01-02 Thread Koji Kawamura
Hi Jennifer,

Thank you very much for reporting this. It seems the line converts a
Boolean to "0" or "1" at ConvertJSONToSQL is implemented wrongly.
Looks like a careless mistake. Sorry for the inconvenience.
https://github.com/apache/nifi/commit/8acee02393f9557b9679038b933ba49705984cf8#diff-d99a1a0b78bf07b01f45c26059fc0d67R533

I will submit a JIRA and a PR to fix this.

Thanks,
Koji

On Wed, Jan 3, 2018 at 6:31 AM, Jennifer Kissinger
 wrote:
> Good morning,
>
> We've uncovered a bug between 1.3.0 and 1.4.0 in the way ConvertJSONToSQL
> handles booleans, which is flipping true to false and vice versa in our
> data.
>
> Our input to ConvertJSONToSQL is a JSON array that contains true/false
> values, like this:
>
> [
>{
>   "foo": true
>},
>{
>   "foo": false
>}
> ...
> ]
>
> After splitting the array and converting to sql, the output of
> ConvertJSONToSQL feeds into PutSQL, which saves the data to a Postgres RDS
> database.
>
> In 1.3.0, the processor outputs attributes from booleans like this:
> ```
> sql.args.N.type
> -7
> sql.args.N.value
> t
> ```
>
> But in 1.4.0, the boolean attributes become:
>
> ```
> sql.args.N.type
> -7
> sql.args.N.value
> 0
> ```
>
> In both cases, the value above was converted from 'true'. When PutSQL
> executes the sql command, it interprets '0' as false--effectively flipping
> our boolean values when saved to the database.
>
> It looks like this is a collision between
> https://issues.apache.org/jira/browse/NIFI-1613 and
> https://issues.apache.org/jira/browse/NIFI-3372--the latter associates "1"
> and "t" with true in the PutSQL processor, while the former associates "0"
> with true in the ConvertJSON processor (see PR:
> https://github.com/apache/nifi/commit/8acee02393f9557b9679038b933ba49705984cf8#diff-d99a1a0b78bf07b01f45c26059fc0d67R533).
> Personally I would think "1" would be true and "0" would be false, but it
> doesn't matter, as long as it's consistent.
>
> We're rolling back to 1.3.0, but there is functionality in 1.4.0 I would
> love to have--any chance of a patch?
>
> Thanks,
>
> ~Jenni
>
> --
> Jennifer Kissinger
> Senior Data Engineer
> SemanticBits, LLC
> jennifer.kissin...@semanticbits.com
> 603-290-1711


Re: Replay Event UUID

2017-12-27 Thread Koji Kawamura
Hi Rotem,

When I tested it with NiFi 1.5.0-SNAPSHOT, the REPLAY event has its
FlowFile UUID as the parent (original) FlowFile UUID as expected.

Type
REPLAY
FlowFile Uuid
8c61fdd7-c084-4756-946c-f5669dc4442d
File Size
4 bytes
Component Id
9abc21e3-0160-1000-6d6f-a1c408f75b7a
Component Name
UpdateAttribute
Component Type
UpdateAttribute

Parent FlowFiles (1)
8c61fdd7-c084-4756-946c-f5669dc4442d
Child FlowFiles (1)
0428b60c-44cb-46df-85ab-3787a38693e9

The implementation has not been modified. It's been using parentUUID.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java#L4241

Is this different than what you are seeing?

Thanks,
Koji

On Thu, Dec 28, 2017 at 2:46 AM, Rotem Mayo  wrote:
> Hi!
>
> I have been using nifi for a while, and have recently upgraded from 1.2.0 to
> 1.4.0.
>
> I am having problems with replay events. In version 1.2.0, the UUID of a
> replay event is the id of the original flowfile that is being replayed. This
> is the behavior that is documented in the developer guid (
> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#provenance_events).
>
> But, in nifi 1.4.0, the replay event's UUID is that of the child flowfile
> that is created. This change is not documented in the changes log, and I
> could not find any ticket related to the subject. In addition, other events
> that create child flowfiles, like clone and fork, still behave according to
> the documented way.
>
> Was this change done on purpose? Will other similar events change in the
> future?
>
> Thanks!
> Rotem


Re: Clarification on load distribution on NiFi cluster

2017-12-21 Thread Koji Kawamura
Hi Ravi,

To distribute QueryDatabaseTable workload, I'd suggest using
GenerateTableFetch instead. Because it can generate SQLs to query
updated records. And those SQL FlowFiles can be distributed among NiFi
nodes by RPG.

-

Following lines are just to share my thoughts on the topic for
possible improvements.
Ravi, Joe, how do you think?

For QueryDatabaseTable, if there are multiple input tables, since it
supports EL at table name, table names would be distributed among NiFi
nodes to query each table from each NiFi node similar to List -> Fetch
pattern.
E.g. GenerateFlowFile (use 'Custom Text' to list target table names,
only on primary node)
-> SplitText (split per line)
-> RPG
-> QueryDatabaseTable
GenerateFlowFile and SplitText part can be anything that can generate
table names.

QueryDatabaseTable also supports EL at 'Additional WHERE clause'. So
if rows can be partitioned by particular column(s) and it's
effectively useful for query (e.g. properly indexed) then it would be
possible to distribute workload at record level by using the similar
approach.

However, above example will not work actually.
Because NiFi manages cluster-wide processor state per processor id,
the above example flow might updates the same state object from
multiple NiFi nodes concurrently.
It will result an optimistic lock error if that happens.

To make it work, I'd suggest NiFi framework level improvements to
support 'Custom State Key' for storing cluster-wide processor state.
For example, with above example flow, QueryDatabaseTable would specify
"table name + hash of additional where clause", then NiFi framework
would store the state object with the provided custom state key and
processor uuid.
This way, those processors can distribute workload while keeping state
cluster wide without competing updating the same state object.

For GetHBase, currently it does not support EL at table name nor
column family name.
HBase scan can be distributed by executing partial scans per region
(startRow and stopRow).
Although Get processor do not take input FlowFiles usually, input
FlowFile and EL support can be added to GetHBase.
And add a processor to generate region info, GetHBaseRegions (start
and end row ids), split and distribute it using RPG, then pass it to
GetHBase.
In this design, 'Custom State Key' for cluster-wide processor state
will be needed, too.

If it looks reasonable, I'd like to create JIRAs for possible future work.

Thanks,
Koji


On Fri, Dec 22, 2017 at 3:58 AM, Ravi Papisetti (rpapiset)
 wrote:
> Thanks Joe for confirmation.
>
> I think, problem gets interesting when we start querying data from data bases 
> such as Oracle(QueryDatabaseTable) and Hbase (GetHbase). Are these also 
> expected to configure with “PrimaryNode”? RPG doesn’t seem to be recommended 
> to distribute huge content across cluster for data transfer. Any 
> recommendation how we should optimize and scale NiFi to transfer a huge hbase 
> (to HDFS or HBASE) or oracle table (to HDFS or HBASE) by taking advantage of 
> cluster.
>
> Thanks,
> Ravi Papisetti
>
> On 21/12/17, 11:51 AM, "Joe Witt"  wrote:
>
> Ravi
>
> The pattern of List ->  -> Fetch -> Merge -> Put is common
> and is the correct way to use it.
>
> ListFile can often be used across the cluster all at once and work out
> fine since each node is often accessing a unique resource like a local
> file system.  However, in this case you're pointing them all at the
> same source via a network share so they'll compete to pull data.
> We're not sharing state between them as this coordination is extremely
> costly.  At least that is what it sounds like you're doing.  So
> running ListFile on primary only and then distributing the results is
> correct.  Before sending to HDFS you'll often want/need to merge
> content first unless your objects are large enough to begin with.
>
> Thanks
> Joe
>
> On Thu, Dec 21, 2017 at 12:17 PM, Ravi Papisetti (rpapiset)
>  wrote:
> > Hi,
> >
> >
> >
> > We are creating processgroups to transfer data from File system to
> > HDFS/HBASE (separate process groups for each destination).
> >
> >
> >
> > Simple Example: FileSystem to HDFS process group: We have ListFile,
> > FetchFile and PutHDFS process to transfer data from File system to HDFS.
> > When “ListFile” processor’s execution mode is configured with “All 
> Nodes”,
> > there is an error saying lease expired due to race condition.
> >
> >
> >
> > What I understood from exploration is, all nodes are trying to run
> > “listFile” on target directory and all nodes in the cluster are trying 
> to
> > transfer same files, when processing is complete by one of the node, 
> other
> > node is throwing this exception message.
> >
> >
> >
> > In resolving that, we set “ListFIle” execution mode to run 

Re: GetSFTP error

2017-11-15 Thread Koji Kawamura
Hello,

I haven't tried it myself, but from the stacktrace and Jsch souce
code, I think you should specify a file in pkcs8 format, instead of
pkcs12.
Jsch will leave keypair null if it fails to parse it, that may be the
cause of the NullPointerException.

For converting a pem file to a pkcs8, there is a Stackoverflow thread
which might be helpful in your case, too:

To convert the private key from PKCS#1 to PKCS#8 with openssl:
# openssl pkcs8 -topk8 -inform PEM -outform PEM -nocrypt -in pkcs1.key
-out pkcs8.key
https://stackoverflow.com/questions/8290435/convert-pem-traditional-private-key-to-pkcs8-private-key

Thanks,
Koji

On Tue, Nov 14, 2017 at 2:29 AM, Yatsevitch, Mcgregor J
 wrote:
> Hi,
> I'm trying to get a GetSFTP processor to work.
> I've provided the private key path.
> The error in nifi-app.log says:
> Null.PointerException: null
> at com.jcraft.jsch.KeyPairPKCS8.getPublicKeyBlob(keyPairPKCS8.java:199)
> at com.jcraft.jsch.IdentityFile.getPublicKeyBlob(IdentityFile.java:199)
>
> The file in the private key path is a pem file, created with an openssl 
> pkcs12 command.
>
> Has anyone come across this? What do I have wrong?
> Thanks


  1   2   >