I was imagining that inference using only the "first ten records" would not
require inferring the entire schema, I get it now.

I was hoping to use the InferAvroSchema to split the records into more
manageable chunks.  So for a generalized solution it's a chicken and egg
thing.  However for a less generalized solution I can certainly skip the
inference step altogether for now.

Thanks Jeremy !

On Thu, Sep 14, 2017 at 10:00 PM, Jeremy Dyer <[email protected]> wrote:

> Neil - The number of records analyzed property in conjunction with JSON
> content is a little hairy. It's pretty clear with something like CSV where
> each line is considered a record. With JSON that distinction becomes less
> clear on what is actually a "record". AKA is the record the object or each
> element in the array. Ultimately NiFi invokes the Kite SDK to handle this
> task. Kite attempts to parse the JSON and iterate through X number of
> assumed Avro schemas/records. I would assume that kite is probably
> attempting to read in this entire array of objects and causing your OOM
> error.
>
> I would feed in a smaller set of data to the InferAvroScema processor and
> then use the resulting output schema from that run on subsequent processors
> removing the InferAvroScema from the flow all together. I am making the
> assumption that this data is always the same. If that is not the case this
> will not work and we can look at other options.
>
> Don't worry we will get it going at some point 👌
>
> - Jeremy Dyer
>
>
>
> Sent from my iPhone
> > On Sep 14, 2017, at 5:13 PM, Neil Derraugh <neil.derraugh@
> intellifylearning.com> wrote:
> >
> > I have a 328MB json file sitting in a queue that feeds into an
> InferAvroSchema (1.3.0) processor that is currently stopped.  The json file
> consists of a single array containing just over 1.5M small objects.  The
> heap is set to 4GB.  Before starting the processor the heap usage on the
> node that has the file is about 20%. If I start the InferAvroSchema
> eventually I run out of heap and start seeing the following in the logs.
> >
> > 2017-09-14 13:01:25,597 WARN [Clustering Tasks Thread-3]
> o.apache.nifi.controller.FlowController Failed to send heartbeat due to:
> org.apache.nifi.cluster.protocol.ProtocolException: Failed marshalling
> 'HEARTBEAT' protocol message due to: javax.net.ssl.SSLException: Received
> fatal alert: unexpected_message
> > 2017-09-14 13:01:25,655 INFO [Curator-ConnectionStateManager-0]
> o.a.n.c.l.e.CuratorLeaderElectionManager org.apache.nifi.controller.
> leader.election.CuratorLeaderElectionManager$ElectionListener@27ae11db
> Connection State changed to SUSPENDED
> > 2017-09-14 13:01:25,690 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background operation retry gave up
> > org.apache.zookeeper.KeeperException$ConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> >       at org.apache.zookeeper.KeeperException.create(
> KeeperException.java:99)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:857)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >       at java.lang.Thread.run(Thread.java:748)
> > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background retry gave up
> > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode =
> ConnectionLoss
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:838)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >       at java.lang.Thread.run(Thread.java:748)
> > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background operation retry gave up
> > org.apache.zookeeper.KeeperException$ConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> >       at org.apache.zookeeper.KeeperException.create(
> KeeperException.java:99)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:857)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >       at java.lang.Thread.run(Thread.java:748)
> > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background retry gave up
> > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode =
> ConnectionLoss
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:838)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >       at java.lang.Thread.run(Thread.java:748)
> > 2017-09-14 13:01:25,696 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background operation retry gave up
> > org.apache.zookeeper.KeeperException$ConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> >       at org.apache.zookeeper.KeeperException.create(
> KeeperException.java:99)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:857)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >       at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >       at java.lang.Thread.run(Thread.java:748)
> >
> > The node drops out of the cluster and doesn't seem to rejoin unless I
> restart nifi on that node.
> >
> > The InferAvroSchema processor is only looking at the first 10 records.
> It's an array of objects so the first ten elements in the array could be
> pulled out and processed without processing the whole file as long as we
> accept that it won't otherwise be validated at this step.  Is that a
> possibility?  Is there a better way to do what I'm trying to do?
>

Reply via email to