Re: Unable to access flowfile content

2019-12-19 Thread wangl...@geekplus.com.cn
Hi Purushotham, 

Since you are using cluster mode, just delete the flow.xml.gz file and restart 
the node, the flow file will be synced from the other two nodes.

Regards,
Lei



wangl...@geekplus.com.cn
 
From: Purushotham Pushpavanthar
Date: 2019-12-19 17:05
To: dev
Subject: Unable to access flowfile content
Hi,
 
We've have 3 node production cluster running seamlessly for almost 8 month
with manageable ups and downs. However, yesterday we ran into an issue in
one of the processors due to which CPU shot up and node went down. On
restart, the contents of few enqueued flowfiles went missing all of sudden
(I was unable to view content from the content viewer in UI). This also
resulted in below exception, when was blocking downstream processor from
processing any flowfile.
We are using version 1.9.2. It would be very helpful if you can help me
debug this issue.
2019-12-19 07:05:03,653 ERROR [Timer-Driven Process Thread-4]
o.apache.nifi.processors.hive.PutHiveQL
PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a]
PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a] failed to process
session due to java.lang.RuntimeException: Failed to execute due to
org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83];
Processor Administratively Yielded for 1 sec: java.lang.RuntimeException:
Failed to execute due to
org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
java.lang.RuntimeException: Failed to execute due to
org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
  at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:145)
  at
org.apache.nifi.processors.hive.PutHiveQL.lambda$onTrigger$6(PutHiveQL.java:295)
  at
org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
  at
org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
  at org.apache.nifi.processors.hive.PutHiveQL.onTrigger(PutHiveQL.java:295)
  at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
  at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
  at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
  at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.nifi.processor.exception.FlowFileAccessException:
Could not read from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
  at
org.apache.nifi.controller.repository.io.FlowFileAccessInputStream.read(FlowFileAccessInputStream.java:93)
  at
org.apache.nifi.controller.repository.io.TaskTerminationInputStream.read(TaskTerminationInputStream.java:68)
  at org.apache.nifi.stream.io.StreamUtils.fillBuffer(StreamUtils.java:89)
  at org.apache.nifi.stream.io.StreamUtils.fillBuffer(StreamUtils.java:72)
  at
org.apache.nifi.processors.hive.AbstractHiveQLProcessor$1.process(AbstractHiveQLProcessor.java:92)
  at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2212)
  at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2180)
  at
org.apache.nifi.processors.hive.AbstractHiveQLProcessor.getHiveQL(AbstractHiveQLProcessor.java:89)
  at
org.apache.nifi.processors.hive.PutHiveQL.lambda$new$4(PutHiveQL.java:220

Re: Re: NiFi backpressure not work

2019-10-16 Thread wangl...@geekplus.com.cn
CaptureChangeMySQLSelf is  from CaptureChangeMySQL and I fix a small bug about 
binlog parsing. But i am sure there's no change that will affect the 
backpressure mechanism.

Actually I don't quite understand it. 
It is related with the implementation of the processor?  In order to use the 
backpressure mechanism, the processor must do something? 
In my case, what it will happen if the queue continue to grow(for example to 
100G)?  Something bad will happen?

Thanks,
Lei




wangl...@geekplus.com.cn
 
From: josef.zahn...@swisscom.com
Date: 2019-10-16 14:17
To: us...@nifi.apache.org; dev@nifi.apache.org
Subject: Re: NiFi backpressure not work
Hi Lei
 
I assume it’s the same as for the ListSFTP. As I don’t know the 
CaptureChangeMySQLSelf processor I try to explain it for the ListSFTP. There 
with one single execution of the processor you get as much flowfiles as you 
have files on your disk. If it finds 1000 it sends out 1000 flowfiles. If it 
finds 1’000’000 it sends out 1’000’000 flowfiles. The processor can’t hold back 
some flows and knows nothing about your queue, so it sends out everything in 
one “batch” per execution – even if it is more than the queue size. Hope this 
explains the behavior a bit.
 
Cheers
 
 
From: "wangl...@geekplus.com.cn" 
Reply to: "us...@nifi.apache.org" 
Date: Wednesday, 16 October 2019 at 08:10
To: users , dev 
Subject: NiFi backpressure not work
 
As the picture shows.
I have set the backpressure object threshold to 1, but picture shows 
there's more than 1,000,000. Why this happens?
 
 
 
Thanks, Lei


wangl...@geekplus.com.cn


NiFi backpressure not work

2019-10-16 Thread wangl...@geekplus.com.cn
As the picture shows.
I have set the backpressure object threshold to 1, but picture shows 
there's more than 1,000,000. Why this happens?



Thanks, Lei


wangl...@geekplus.com.cn


Re: Re: MergeRecord can not guarantee the ordering of the input sequence?

2019-10-15 Thread wangl...@geekplus.com.cn
Hi Koji, 
Actually i have set all connections to FIFO and concurrency tasks to 1 for all 
processors.
Before and after the MergeRecord, I add a LogAttribute to debug.

Before MergeRecord,the order in logfile is A,B,C in three flowfile 
After  MergeRecord, the order becomes {A,C,B} in one flowfile
This is nondeterministic.

I think I should look up the MergeRecord code and do further debug.

Thanks, 
Lei




wangl...@geekplus.com.cn
 
From: Koji Kawamura
Date: 2019-10-16 09:46
To: users
CC: dev
Subject: Re: MergeRecord can not guarantee the ordering of the input sequence?
Hi Lei,
 
How about setting FIFO prioritizer at all the preceding connections
before the MergeRecord?
Without setting any prioritizer, FlowFile ordering is nondeterministic.
 
Thanks,
Koji
 
On Tue, Oct 15, 2019 at 8:56 PM wangl...@geekplus.com.cn
 wrote:
>
>
> If  FlowFile A, B, C enter the MergeRecord sequentially, the output should be 
> one FlowFile {A, B, C}
> However, when testing with  large data volume, sometimes the output order 
> will be not the same as they enter. And this result is nondeterministic
>
> This really confuses me a lot.
> Anybody has any insight on this?
>
> Thanks,
> Lei
>
> ________
> wangl...@geekplus.com.cn


PutDataBaseRecord performance issues.

2019-10-15 Thread wangl...@geekplus.com.cn
I am using CaptureChangeMySQL to extract bin log and do some transformations 
and then write to another database using  PutDataBaseRecord.  Now the 
PutDataBaseRecord  processor is a performance bottleneck
If i set the PutDataBaseRecord  processor concurrency lager than 1,  there will 
be ordering issues. The ordering the binlog to the destination database will 
not be the same as they comming. But with  one concurrency, the TPS is only 
about 80/s 
Even I add a MergeRecord before PutDataBaseRecord, the TPS is no more than 300
Anybody have any idea about this?

Thanks,
Lei



wangl...@geekplus.com.cn


Data inconsistency happens when using CDC to replicate my database

2019-10-14 Thread wangl...@geekplus.com.cn
Using CaptureChangeMySQL to extract binlog, do some translation and then put to 
another database with PutDatabaseRecord processor.
But there's always data inconsitency between destination database and souce 
database. To debug this, I have do the following settings. 
CaptureChangeMySQL only output one table. There's a field called order_no that 
is uniq in the table.
All the proessors are scheduled with only one concurrency.
No data balance between nodes. All run on primary node
After CaptureChangeMySQL, add a LogAttrubute processor called log1. Before 
PutDatabaseRecord, also add a LogAttrubute, called log2. 
For the inconsistent data, i can  grep the order_no in log1 and log2. 
For one specified order_no, there's total 5  binlog message. But in log1, 
there's only one message. In log2, there's 5, but the order is changed. 

position   type
201721167  insert (appeared in log1 and log2)
201926490  update(appeared only in log2)
202728760  update(appeared only in log2)
203162806  update(appeared only in log2)
203135127  update (appeared only in log2, the position number is smaller then 
privious msg)

This really confused me a lot.
Any insight on this?  Thanks very much.

Lei



wangl...@geekplus.com.cn


Can CaptureChangeMySQL be scheduled to all nodes instead of primary node?

2019-10-09 Thread wangl...@geekplus.com.cn
I am using CaptureChangeMySQL to replicate the database.
There are many data sources and so there're many  CaptureChangeMySQL processors.
The CaptureChangeMySQL throws same slave id error  if scheduled on all nodes. 
So it can only be scheduled on primary node. This causes  very heavy load on 
the primary node.

Is there any method than i can  distribute the CaptureChangeMySQL processors to 
all nodes instead of only to primary node?

Thanks,
Lei 



wangl...@geekplus.com.cn


Memory issues when using NiFi to replicate mysql database with CaptureChangeMySQL

2019-09-30 Thread wangl...@geekplus.com.cn

Usnig CaptureChangeMySQL do parse binlog, then do some transformation and 
finally put the result to another database. And at the same time put binlog msg 
to Message Queue.
Some controller services are used,  HortonworksSchemaRegistry, 
DistributedMapCacheServer, JSONTreeReader,JSONTreeWriter, etc
 
Two nodes as nifi server, each  16c, 32G 

When we do the pressure test, the memory usage is very high and offten the 
primary node is switched.
Maximjm Timer Driver Thread Count is set to 2000 for high throughput
The input QPS from CaptureChangeMySQL is about 2000/s 

Any suggesions on how to optimize the memory usage of NiFi?

Thanks,
Lei



wangl...@geekplus.com.cn


CaptureChangeMySQL table name map error if one update statement changed two tables

2019-09-26 Thread wangl...@geekplus.com.cn
If one update statement  updated two tables:

 UPDATE tableA a, tableB b SET a.cell_code = 'A5', b.cell_code = 'A5' WHERE 
a.id =3226855 AND b.id = 3226855

In the binlog, there will be two contiunous TABLEMAP events and following the 
UPDATE_EVENT corresponding to the two tables

TableMap  AMap
TableMap  BMap
UPDATE AUPDATE
UPDATE BUPDATE

If  a DistributedCacheClient is used,  the two UPDATE EVENT will be mapped to 
the same table(the latter BMap).

It is an expected behavior according to the source code. 
I think it is a bug, we can fix it

Thanks, 
Lei
 


wangl...@geekplus.com.cn


Re: RE: CaptureChangeMySQL throws communication failure error every 30 seconds

2019-09-16 Thread wangl...@geekplus.com.cn

Actullay the server id is unique and it is scheduled only on primary node
By using iftop tool on the nifi side, i see more than one tcp connection 
between the mysql sever and the nifi server.
Even i stop the CaptureChangeMySQL processor, the tcp connection still exists. 
Looking the nifi-app.log:  

2019-09-12 14:45:50,725 INFO [blc-keepalive-192.168.114.118:3306] 
c.g.shyiko.mysql.binlog.BinaryLogClient Trying to restore lost connection to 
192.168.114.118:3306
2019-09-12 14:45:50,904 INFO [blc-192.168.114.118:3306] 
c.g.shyiko.mysql.binlog.BinaryLogClient Connected to 192.168.114.118:3306 at 
mysql-bin.000675/707993476 (sid:10091, cid:5560080)

I have stopped the processor, why it still retry to connect to mysql server? 

Thanks,
Lei 




wangl...@geekplus.com.cn
 
From: Williams, Jim
Date: 2019-09-11 21:42
To: us...@nifi.apache.org
Subject: RE: CaptureChangeMySQL throws communication failure error every 30 
seconds
Lei,
 
Is this flow being run on a cluster?
 
I’ve not worked with this processor.  It seems possible that if the processor 
is scheduled to run every thirty seconds and is not restricted to run on the 
primary node of the cluster that this sort of message might occur.
 
 
Warm regards,
 
Jim Williams | Principal Database Developer
O: +1 713.341.7812 | C: +1 919.523.8767 | jwilli...@alertlogic.com | 
alertlogic.com 
 
 
From: Matt Burgess  
Sent: Wednesday, September 11, 2019 8:34 AM
To: us...@nifi.apache.org
Subject: Re: CaptureChangeMySQL throws communication failure error every 30 
seconds
 
Lei,
 
This is most likely due to a duplicate Server ID, each instance of the client 
and server needs a unique ID or you will see errors like this. Set the "Server 
ID" property in CaptureChangeMySQL to some random integer (like 42 :) and that 
should remove the errors.
 
Regards,
Matt
 
On Mon, Sep 9, 2019 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Seems this error accurs every 30 sedonds
 
 
Any insight on this?
 
Thanks, 
Lei 
 


wangl...@geekplus.com.cn


NiFi cluster heavy cpu usage

2019-09-16 Thread wangl...@geekplus.com.cn

I deployed a nifi cluster with two virtual machines,  each  4 core, 16 GB  
memroy
The cpu is more than 100% even there's no processor running.
Often the cpu is about 300% after i start some processors. 

[root@iZuf6agrzcolqeja3if7kbZ ~]# ps -mp 3429  -o THREAD,tid,time | sort -k2 -n 
-r |less 
root  171   -- - -  - - 07:08:43
root 16.1  19- futex_-  -  3598 00:40:18
root 16.1  19- futex_-  -  3569 00:40:29
root 16.0  19- futex_-  -  3597 00:40:02
root 15.9  19- futex_-  -  3603 00:39:51
root 15.9  19- futex_-  -  3601 00:39:57
root 15.9  19- - -  -  3604 00:39:57
root 15.8  19- - -  -  3713 00:39:40
root 15.7  19- - -  -  3600 00:39:27
root 15.6  19- futex_-  -  3712 00:39:00
root 15.6  19- futex_-  -  3593 00:39:05


There's some threads consumes cpu. I pick one and jstack it, often it is in 
waiting state.
Any insight on this?

Thanks, 
Lei



wangl...@geekplus.com.cn


回复: CaptureChangeMySQL bit(1) type field is parsed to '{}'

2019-09-03 Thread wangl...@geekplus.com.cn

Anybody has any insight on this? 


wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2019-08-22 19:31
收件人: users
主题: CaptureChangeMySQL bit(1) type field is parsed to '{}'

CaptureChangeMySQL -> ConvertJSONToSQL -> ExecuteSQL to replicate my database.
But when the field type is bit, there will be error.  Because teh value is 
parsed to '{}'

`is_cancel` bit(1) DEFAULT b'0' COMMENT 'canceled',

[ {
  "id" : 6173148,
  "task_id" : 6173148,
  "charger_id" : 1,
  "is_cancel" : "{}",
  "priority" : 0,
  "business_sequence" : 0
} ]

Is this a bug? 
Or there‘s some method i can avoid this?

Thanks,
Lei


wangl...@geekplus.com.cn


Still have Network traffic after stopped the CaptureChangeMySQL processor.

2019-08-30 Thread wangl...@geekplus.com.cn


Using CaptureChangeMySQL to parse binlog from mysql server.  Surely there will 
be network traffic transmission from the MySQL server to NiFi Server.
But after I stop the CaptureChangeMySQL  processor,  why the network traffic 
transmission  still exists?  

Thanks,
Lei 



wangl...@geekplus.com.cn