Re: Unable to access flowfile content
Hi Purushotham, Since you are using cluster mode, just delete the flow.xml.gz file and restart the node, the flow file will be synced from the other two nodes. Regards, Lei wangl...@geekplus.com.cn From: Purushotham Pushpavanthar Date: 2019-12-19 17:05 To: dev Subject: Unable to access flowfile content Hi, We've have 3 node production cluster running seamlessly for almost 8 month with manageable ups and downs. However, yesterday we ran into an issue in one of the processors due to which CPU shot up and node went down. On restart, the contents of few enqueued flowfiles went missing all of sudden (I was unable to view content from the content viewer in UI). This also resulted in below exception, when was blocking downstream processor from processing any flowfile. We are using version 1.9.2. It would be very helpful if you can help me debug this issue. 2019-12-19 07:05:03,653 ERROR [Timer-Driven Process Thread-4] o.apache.nifi.processors.hive.PutHiveQL PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a] PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a] failed to process session due to java.lang.RuntimeException: Failed to execute due to org.apache.nifi.processor.exception.FlowFileAccessException: Could not read from StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1576648697457-40, container=default, section=40], offset=10977, length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]; Processor Administratively Yielded for 1 sec: java.lang.RuntimeException: Failed to execute due to org.apache.nifi.processor.exception.FlowFileAccessException: Could not read from StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1576648697457-40, container=default, section=40], offset=10977, length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83] java.lang.RuntimeException: Failed to execute due to org.apache.nifi.processor.exception.FlowFileAccessException: Could not read from StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1576648697457-40, container=default, section=40], offset=10977, length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83] at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:145) at org.apache.nifi.processors.hive.PutHiveQL.lambda$onTrigger$6(PutHiveQL.java:295) at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114) at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184) at org.apache.nifi.processors.hive.PutHiveQL.onTrigger(PutHiveQL.java:295) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.nifi.processor.exception.FlowFileAccessException: Could not read from StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1576648697457-40, container=default, section=40], offset=10977, length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83] at org.apache.nifi.controller.repository.io.FlowFileAccessInputStream.read(FlowFileAccessInputStream.java:93) at org.apache.nifi.controller.repository.io.TaskTerminationInputStream.read(TaskTerminationInputStream.java:68) at org.apache.nifi.stream.io.StreamUtils.fillBuffer(StreamUtils.java:89) at org.apache.nifi.stream.io.StreamUtils.fillBuffer(StreamUtils.java:72) at org.apache.nifi.processors.hive.AbstractHiveQLProcessor$1.process(AbstractHiveQLProcessor.java:92) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2212) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2180) at org.apache.nifi.processors.hive.AbstractHiveQLProcessor.getHiveQL(AbstractHiveQLProcessor.java:89) at org.apache.nifi.processors.hive.PutHiveQL.lambda$new$4(PutHiveQL.java:220
Re: Re: NiFi backpressure not work
CaptureChangeMySQLSelf is from CaptureChangeMySQL and I fix a small bug about binlog parsing. But i am sure there's no change that will affect the backpressure mechanism. Actually I don't quite understand it. It is related with the implementation of the processor? In order to use the backpressure mechanism, the processor must do something? In my case, what it will happen if the queue continue to grow(for example to 100G)? Something bad will happen? Thanks, Lei wangl...@geekplus.com.cn From: josef.zahn...@swisscom.com Date: 2019-10-16 14:17 To: us...@nifi.apache.org; dev@nifi.apache.org Subject: Re: NiFi backpressure not work Hi Lei I assume it’s the same as for the ListSFTP. As I don’t know the CaptureChangeMySQLSelf processor I try to explain it for the ListSFTP. There with one single execution of the processor you get as much flowfiles as you have files on your disk. If it finds 1000 it sends out 1000 flowfiles. If it finds 1’000’000 it sends out 1’000’000 flowfiles. The processor can’t hold back some flows and knows nothing about your queue, so it sends out everything in one “batch” per execution – even if it is more than the queue size. Hope this explains the behavior a bit. Cheers From: "wangl...@geekplus.com.cn" Reply to: "us...@nifi.apache.org" Date: Wednesday, 16 October 2019 at 08:10 To: users , dev Subject: NiFi backpressure not work As the picture shows. I have set the backpressure object threshold to 1, but picture shows there's more than 1,000,000. Why this happens? Thanks, Lei wangl...@geekplus.com.cn
NiFi backpressure not work
As the picture shows. I have set the backpressure object threshold to 1, but picture shows there's more than 1,000,000. Why this happens? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: MergeRecord can not guarantee the ordering of the input sequence?
Hi Koji, Actually i have set all connections to FIFO and concurrency tasks to 1 for all processors. Before and after the MergeRecord, I add a LogAttribute to debug. Before MergeRecord,the order in logfile is A,B,C in three flowfile After MergeRecord, the order becomes {A,C,B} in one flowfile This is nondeterministic. I think I should look up the MergeRecord code and do further debug. Thanks, Lei wangl...@geekplus.com.cn From: Koji Kawamura Date: 2019-10-16 09:46 To: users CC: dev Subject: Re: MergeRecord can not guarantee the ordering of the input sequence? Hi Lei, How about setting FIFO prioritizer at all the preceding connections before the MergeRecord? Without setting any prioritizer, FlowFile ordering is nondeterministic. Thanks, Koji On Tue, Oct 15, 2019 at 8:56 PM wangl...@geekplus.com.cn wrote: > > > If FlowFile A, B, C enter the MergeRecord sequentially, the output should be > one FlowFile {A, B, C} > However, when testing with large data volume, sometimes the output order > will be not the same as they enter. And this result is nondeterministic > > This really confuses me a lot. > Anybody has any insight on this? > > Thanks, > Lei > > ________ > wangl...@geekplus.com.cn
PutDataBaseRecord performance issues.
I am using CaptureChangeMySQL to extract bin log and do some transformations and then write to another database using PutDataBaseRecord. Now the PutDataBaseRecord processor is a performance bottleneck If i set the PutDataBaseRecord processor concurrency lager than 1, there will be ordering issues. The ordering the binlog to the destination database will not be the same as they comming. But with one concurrency, the TPS is only about 80/s Even I add a MergeRecord before PutDataBaseRecord, the TPS is no more than 300 Anybody have any idea about this? Thanks, Lei wangl...@geekplus.com.cn
Data inconsistency happens when using CDC to replicate my database
Using CaptureChangeMySQL to extract binlog, do some translation and then put to another database with PutDatabaseRecord processor. But there's always data inconsitency between destination database and souce database. To debug this, I have do the following settings. CaptureChangeMySQL only output one table. There's a field called order_no that is uniq in the table. All the proessors are scheduled with only one concurrency. No data balance between nodes. All run on primary node After CaptureChangeMySQL, add a LogAttrubute processor called log1. Before PutDatabaseRecord, also add a LogAttrubute, called log2. For the inconsistent data, i can grep the order_no in log1 and log2. For one specified order_no, there's total 5 binlog message. But in log1, there's only one message. In log2, there's 5, but the order is changed. position type 201721167 insert (appeared in log1 and log2) 201926490 update(appeared only in log2) 202728760 update(appeared only in log2) 203162806 update(appeared only in log2) 203135127 update (appeared only in log2, the position number is smaller then privious msg) This really confused me a lot. Any insight on this? Thanks very much. Lei wangl...@geekplus.com.cn
Can CaptureChangeMySQL be scheduled to all nodes instead of primary node?
I am using CaptureChangeMySQL to replicate the database. There are many data sources and so there're many CaptureChangeMySQL processors. The CaptureChangeMySQL throws same slave id error if scheduled on all nodes. So it can only be scheduled on primary node. This causes very heavy load on the primary node. Is there any method than i can distribute the CaptureChangeMySQL processors to all nodes instead of only to primary node? Thanks, Lei wangl...@geekplus.com.cn
Memory issues when using NiFi to replicate mysql database with CaptureChangeMySQL
Usnig CaptureChangeMySQL do parse binlog, then do some transformation and finally put the result to another database. And at the same time put binlog msg to Message Queue. Some controller services are used, HortonworksSchemaRegistry, DistributedMapCacheServer, JSONTreeReader,JSONTreeWriter, etc Two nodes as nifi server, each 16c, 32G When we do the pressure test, the memory usage is very high and offten the primary node is switched. Maximjm Timer Driver Thread Count is set to 2000 for high throughput The input QPS from CaptureChangeMySQL is about 2000/s Any suggesions on how to optimize the memory usage of NiFi? Thanks, Lei wangl...@geekplus.com.cn
CaptureChangeMySQL table name map error if one update statement changed two tables
If one update statement updated two tables: UPDATE tableA a, tableB b SET a.cell_code = 'A5', b.cell_code = 'A5' WHERE a.id =3226855 AND b.id = 3226855 In the binlog, there will be two contiunous TABLEMAP events and following the UPDATE_EVENT corresponding to the two tables TableMap AMap TableMap BMap UPDATE AUPDATE UPDATE BUPDATE If a DistributedCacheClient is used, the two UPDATE EVENT will be mapped to the same table(the latter BMap). It is an expected behavior according to the source code. I think it is a bug, we can fix it Thanks, Lei wangl...@geekplus.com.cn
Re: RE: CaptureChangeMySQL throws communication failure error every 30 seconds
Actullay the server id is unique and it is scheduled only on primary node By using iftop tool on the nifi side, i see more than one tcp connection between the mysql sever and the nifi server. Even i stop the CaptureChangeMySQL processor, the tcp connection still exists. Looking the nifi-app.log: 2019-09-12 14:45:50,725 INFO [blc-keepalive-192.168.114.118:3306] c.g.shyiko.mysql.binlog.BinaryLogClient Trying to restore lost connection to 192.168.114.118:3306 2019-09-12 14:45:50,904 INFO [blc-192.168.114.118:3306] c.g.shyiko.mysql.binlog.BinaryLogClient Connected to 192.168.114.118:3306 at mysql-bin.000675/707993476 (sid:10091, cid:5560080) I have stopped the processor, why it still retry to connect to mysql server? Thanks, Lei wangl...@geekplus.com.cn From: Williams, Jim Date: 2019-09-11 21:42 To: us...@nifi.apache.org Subject: RE: CaptureChangeMySQL throws communication failure error every 30 seconds Lei, Is this flow being run on a cluster? I’ve not worked with this processor. It seems possible that if the processor is scheduled to run every thirty seconds and is not restricted to run on the primary node of the cluster that this sort of message might occur. Warm regards, Jim Williams | Principal Database Developer O: +1 713.341.7812 | C: +1 919.523.8767 | jwilli...@alertlogic.com | alertlogic.com From: Matt Burgess Sent: Wednesday, September 11, 2019 8:34 AM To: us...@nifi.apache.org Subject: Re: CaptureChangeMySQL throws communication failure error every 30 seconds Lei, This is most likely due to a duplicate Server ID, each instance of the client and server needs a unique ID or you will see errors like this. Set the "Server ID" property in CaptureChangeMySQL to some random integer (like 42 :) and that should remove the errors. Regards, Matt On Mon, Sep 9, 2019 at 11:35 AM wangl...@geekplus.com.cn wrote: Seems this error accurs every 30 sedonds Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
NiFi cluster heavy cpu usage
I deployed a nifi cluster with two virtual machines, each 4 core, 16 GB memroy The cpu is more than 100% even there's no processor running. Often the cpu is about 300% after i start some processors. [root@iZuf6agrzcolqeja3if7kbZ ~]# ps -mp 3429 -o THREAD,tid,time | sort -k2 -n -r |less root 171 -- - - - - 07:08:43 root 16.1 19- futex_- - 3598 00:40:18 root 16.1 19- futex_- - 3569 00:40:29 root 16.0 19- futex_- - 3597 00:40:02 root 15.9 19- futex_- - 3603 00:39:51 root 15.9 19- futex_- - 3601 00:39:57 root 15.9 19- - - - 3604 00:39:57 root 15.8 19- - - - 3713 00:39:40 root 15.7 19- - - - 3600 00:39:27 root 15.6 19- futex_- - 3712 00:39:00 root 15.6 19- futex_- - 3593 00:39:05 There's some threads consumes cpu. I pick one and jstack it, often it is in waiting state. Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
回复: CaptureChangeMySQL bit(1) type field is parsed to '{}'
Anybody has any insight on this? wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2019-08-22 19:31 收件人: users 主题: CaptureChangeMySQL bit(1) type field is parsed to '{}' CaptureChangeMySQL -> ConvertJSONToSQL -> ExecuteSQL to replicate my database. But when the field type is bit, there will be error. Because teh value is parsed to '{}' `is_cancel` bit(1) DEFAULT b'0' COMMENT 'canceled', [ { "id" : 6173148, "task_id" : 6173148, "charger_id" : 1, "is_cancel" : "{}", "priority" : 0, "business_sequence" : 0 } ] Is this a bug? Or there‘s some method i can avoid this? Thanks, Lei wangl...@geekplus.com.cn
Still have Network traffic after stopped the CaptureChangeMySQL processor.
Using CaptureChangeMySQL to parse binlog from mysql server. Surely there will be network traffic transmission from the MySQL server to NiFi Server. But after I stop the CaptureChangeMySQL processor, why the network traffic transmission still exists? Thanks, Lei wangl...@geekplus.com.cn