Neil - The number of records analyzed property in conjunction with JSON content is a little hairy. It's pretty clear with something like CSV where each line is considered a record. With JSON that distinction becomes less clear on what is actually a "record". AKA is the record the object or each element in the array. Ultimately NiFi invokes the Kite SDK to handle this task. Kite attempts to parse the JSON and iterate through X number of assumed Avro schemas/records. I would assume that kite is probably attempting to read in this entire array of objects and causing your OOM error.
I would feed in a smaller set of data to the InferAvroScema processor and then use the resulting output schema from that run on subsequent processors removing the InferAvroScema from the flow all together. I am making the assumption that this data is always the same. If that is not the case this will not work and we can look at other options. Don't worry we will get it going at some point 👌 - Jeremy Dyer Sent from my iPhone > On Sep 14, 2017, at 5:13 PM, Neil Derraugh > <[email protected]> wrote: > > I have a 328MB json file sitting in a queue that feeds into an > InferAvroSchema (1.3.0) processor that is currently stopped. The json file > consists of a single array containing just over 1.5M small objects. The heap > is set to 4GB. Before starting the processor the heap usage on the node that > has the file is about 20%. If I start the InferAvroSchema eventually I run > out of heap and start seeing the following in the logs. > > 2017-09-14 13:01:25,597 WARN [Clustering Tasks Thread-3] > o.apache.nifi.controller.FlowController Failed to send heartbeat due to: > org.apache.nifi.cluster.protocol.ProtocolException: Failed marshalling > 'HEARTBEAT' protocol message due to: javax.net.ssl.SSLException: Received > fatal alert: unexpected_message > 2017-09-14 13:01:25,655 INFO [Curator-ConnectionStateManager-0] > o.a.n.c.l.e.CuratorLeaderElectionManager > org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@27ae11db > Connection State changed to SUSPENDED > 2017-09-14 13:01:25,690 ERROR [Curator-Framework-0] > o.a.c.f.imps.CuratorFrameworkImpl Background operation retry gave up > org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode > = ConnectionLoss > at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] > o.a.c.f.imps.CuratorFrameworkImpl Background retry gave up > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = > ConnectionLoss > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:838) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] > o.a.c.f.imps.CuratorFrameworkImpl Background operation retry gave up > org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode > = ConnectionLoss > at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] > o.a.c.f.imps.CuratorFrameworkImpl Background retry gave up > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = > ConnectionLoss > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:838) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 2017-09-14 13:01:25,696 ERROR [Curator-Framework-0] > o.a.c.f.imps.CuratorFrameworkImpl Background operation retry gave up > org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode > = ConnectionLoss > at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) > at > org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > > The node drops out of the cluster and doesn't seem to rejoin unless I restart > nifi on that node. > > The InferAvroSchema processor is only looking at the first 10 records. It's > an array of objects so the first ten elements in the array could be pulled > out and processed without processing the whole file as long as we accept that > it won't otherwise be validated at this step. Is that a possibility? Is > there a better way to do what I'm trying to do?
