I was imagining that inference using only the "first ten records" would not require inferring the entire schema, I get it now.
I was hoping to use the InferAvroSchema to split the records into more manageable chunks. So for a generalized solution it's a chicken and egg thing. However for a less generalized solution I can certainly skip the inference step altogether for now. Thanks Jeremy ! On Thu, Sep 14, 2017 at 10:00 PM, Jeremy Dyer <[email protected]> wrote: > Neil - The number of records analyzed property in conjunction with JSON > content is a little hairy. It's pretty clear with something like CSV where > each line is considered a record. With JSON that distinction becomes less > clear on what is actually a "record". AKA is the record the object or each > element in the array. Ultimately NiFi invokes the Kite SDK to handle this > task. Kite attempts to parse the JSON and iterate through X number of > assumed Avro schemas/records. I would assume that kite is probably > attempting to read in this entire array of objects and causing your OOM > error. > > I would feed in a smaller set of data to the InferAvroScema processor and > then use the resulting output schema from that run on subsequent processors > removing the InferAvroScema from the flow all together. I am making the > assumption that this data is always the same. If that is not the case this > will not work and we can look at other options. > > Don't worry we will get it going at some point 👌 > > - Jeremy Dyer > > > > Sent from my iPhone > > On Sep 14, 2017, at 5:13 PM, Neil Derraugh <neil.derraugh@ > intellifylearning.com> wrote: > > > > I have a 328MB json file sitting in a queue that feeds into an > InferAvroSchema (1.3.0) processor that is currently stopped. The json file > consists of a single array containing just over 1.5M small objects. The > heap is set to 4GB. Before starting the processor the heap usage on the > node that has the file is about 20%. If I start the InferAvroSchema > eventually I run out of heap and start seeing the following in the logs. > > > > 2017-09-14 13:01:25,597 WARN [Clustering Tasks Thread-3] > o.apache.nifi.controller.FlowController Failed to send heartbeat due to: > org.apache.nifi.cluster.protocol.ProtocolException: Failed marshalling > 'HEARTBEAT' protocol message due to: javax.net.ssl.SSLException: Received > fatal alert: unexpected_message > > 2017-09-14 13:01:25,655 INFO [Curator-ConnectionStateManager-0] > o.a.n.c.l.e.CuratorLeaderElectionManager org.apache.nifi.controller. > leader.election.CuratorLeaderElectionManager$ElectionListener@27ae11db > Connection State changed to SUSPENDED > > 2017-09-14 13:01:25,690 ERROR [Curator-Framework-0] > > o.a.c.f.imps.CuratorFrameworkImpl > Background operation retry gave up > > org.apache.zookeeper.KeeperException$ConnectionLossException: > KeeperErrorCode = ConnectionLoss > > at org.apache.zookeeper.KeeperException.create( > KeeperException.java:99) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > checkBackgroundRetry(CuratorFrameworkImpl.java:728) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > performBackgroundOperation(CuratorFrameworkImpl.java:857) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > access$300(CuratorFrameworkImpl.java:64) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl$4. > call(CuratorFrameworkImpl.java:267) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:748) > > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] > > o.a.c.f.imps.CuratorFrameworkImpl > Background retry gave up > > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = > ConnectionLoss > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > performBackgroundOperation(CuratorFrameworkImpl.java:838) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > access$300(CuratorFrameworkImpl.java:64) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl$4. > call(CuratorFrameworkImpl.java:267) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:748) > > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] > > o.a.c.f.imps.CuratorFrameworkImpl > Background operation retry gave up > > org.apache.zookeeper.KeeperException$ConnectionLossException: > KeeperErrorCode = ConnectionLoss > > at org.apache.zookeeper.KeeperException.create( > KeeperException.java:99) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > checkBackgroundRetry(CuratorFrameworkImpl.java:728) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > performBackgroundOperation(CuratorFrameworkImpl.java:857) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > access$300(CuratorFrameworkImpl.java:64) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl$4. > call(CuratorFrameworkImpl.java:267) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:748) > > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] > > o.a.c.f.imps.CuratorFrameworkImpl > Background retry gave up > > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = > ConnectionLoss > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > performBackgroundOperation(CuratorFrameworkImpl.java:838) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > access$300(CuratorFrameworkImpl.java:64) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl$4. > call(CuratorFrameworkImpl.java:267) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:748) > > 2017-09-14 13:01:25,696 ERROR [Curator-Framework-0] > > o.a.c.f.imps.CuratorFrameworkImpl > Background operation retry gave up > > org.apache.zookeeper.KeeperException$ConnectionLossException: > KeeperErrorCode = ConnectionLoss > > at org.apache.zookeeper.KeeperException.create( > KeeperException.java:99) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > checkBackgroundRetry(CuratorFrameworkImpl.java:728) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > performBackgroundOperation(CuratorFrameworkImpl.java:857) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl. > access$300(CuratorFrameworkImpl.java:64) > > at org.apache.curator.framework.imps.CuratorFrameworkImpl$4. > call(CuratorFrameworkImpl.java:267) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:748) > > > > The node drops out of the cluster and doesn't seem to rejoin unless I > restart nifi on that node. > > > > The InferAvroSchema processor is only looking at the first 10 records. > It's an array of objects so the first ten elements in the array could be > pulled out and processed without processing the whole file as long as we > accept that it won't otherwise be validated at this step. Is that a > possibility? Is there a better way to do what I'm trying to do? >
