Joe,
How we managed to write a FlowFile repository that we later couldn’t load back
into heap is confusing, but we did it somehow (and we even increased heap from
300GB, what it was set to when we created the repo, to 500GB’s)… A user had
some large JSON blocks stored in an attribute. They then did some versioning on
this attribute and duplicated the value into other attribute names. They loaded
~60k FlowFile’s into the node over about 90 seconds, all with large JSON
attributes. When the 2 minute checkpoint tried to run, it ran out of heap.
Another thing we learned is that you need to have 2x the disk capacity for the
FlowFile Repository. Because when NiFi goes to checkpoint your data it creates
a fully merged duplicate, and both the original and new checkpoints exist on
disk at the same time for a little while.
It was a little frustrating that missing overflow files stopped the journal
from loading. To me, it felt like this should work more like a FlowFile that
is missing it’s queue, or content in the content repository missing it’s
FlowFile. Instead of failing to load the journal, just keep loading the
journal even if the overflow is missing. I’ll look at making a Jira for this.
One cool thing that came out of this was writing scripts to raw read/modify the
overflow files, and I’m sure with some more work could be extended to a full
FlowFile repository offline cleanup utility.
Example of loading a Journal:
StandardResourceClaimManager resourceClaimManager = new
StandardResourceClaimManager();
DataInputStream dataIn = new DataInputStream(new FileInputStream(new
File("/home/nifi/569892338.journal")));
// Read the header/serialization details
final String waliImplementationClass = dataIn.readUTF();
final int waliImplementationVersion = dataIn.readInt();
final String serdeEncoding = dataIn.readUTF(); // ignore serde class
name for now
final int serdeVersion = dataIn.readInt();
final int serdeHeaderLength = dataIn.readInt();
SchemaRepositoryRecordSerde serde = new
SchemaRepositoryRecordSerde(resourceClaimManager){
@Override
protected FlowFileQueue getFlowFileQueue(String queueId) {
return new FlowFileQueue() {
@Override
public String getIdentifier() {
return queueId;
}
…. All other override methods not used ….
};
}
};
final InputStream serdeHeaderIn = new LimitingInputStream(dataIn,
serdeHeaderLength);
final DataInputStream dis = new DataInputStream(serdeHeaderIn);
serde.readHeader(dis);
At this point you can either read the journal (might need a few more
dataIn.reads), or as I did, you can go directly to reading overflow files.
Here is one that tells you which queues are referenced and by how many
FlowFile’s.
Overflow file’s don’t have a header/schema like the journals do, so you have to
load them from the journal, and then use the same serde to read the overflow
files.
private Map<String, Integer> countQueues(String file,
SchemaRepositoryRecordSerde serde, int serdeVersion) throws IOException {
Map<String, Integer> counts = new HashMap<>();
DataInputStream dataInputStream = new DataInputStream(new
FileInputStream(new File(file)));
RepositoryRecord rr = serde.deserializeRecord(dataInputStream,
serdeVersion);
while(rr != null){
final String queue = rr.getOriginalQueue().getIdentifier();
if(counts.containsKey(queue)) {
counts.put(queue, counts.get(queue) + 1);
} else {
counts.put(queue, 1);
}
rr = serde.deserializeRecord(dataInputStream, serdeVersion);
}
dataInputStream.close();
return counts;
}
Here is one where I selectively remove FlowFile’s from existing overflow files
based on their attributes/values and write a new file out:
private void removeQueuedFiles(String file, String newFile,
List<String> queues,
SchemaRepositoryRecordSerde serde, int
serdeVersion) throws IOException {
DataInputStream dataInputStream = new DataInputStream(new
FileInputStream(new File(file)));
final DataOutputStream outStream = new DataOutputStream(new
FileOutputStream(new File(newFile)));
int total=0;
int saved=0;
RepositoryRecord rr = serde.deserializeRecord(dataInputStream,
serdeVersion);
while(rr != null){
total++;
Map<String, String> attrs = rr.getCurrent().getAttributes();
if(!(attrs.containsKey("attributename")
&& attrs.get("attributename ").contains("searchterm"))) {
serde.serializeRecord(rr, outStream);
saved++;
}
rr = serde.deserializeRecord(dataInputStream, serdeVersion);
}
outStream.close();
dataInputStream.close();
System.out.println(file + " - " + saved + " / " + total);
}
From: Joe Witt <[email protected]>
Sent: Thursday, August 15, 2019 10:58 AM
To: [email protected]
Subject: Re: [EXT] Re: FlowFile Repository can't checkpoint, out of heap space.
Peter
All the details you can share on this would be good. First, we should be
resilient to any sort of repo corruption in the event of heap issues. While
obviously the flow isn't in a good state at that point the saved state should
be reliable/recoverable. Second, how the repo/journals got that large itself
should be evaluated/considered/determined. A full JIRA/description of the
situation/logs/known state would be worthy of further resolution.
Thanks
On Thu, Aug 15, 2019 at 12:50 PM Peter Wicks (pwicks)
<[email protected]<mailto:[email protected]>> wrote:
We were able to recover this morning, in the end we deleted the queues that
were causing trouble from the Flow, and when the problem node came online it
deleted the FlowFile’s all on its own, since the queue did not exist. Since
this is done during the FlowFile Repository load into memory, it didn’t run out
of heap.
But before we go to that point we maxed out heap, 500GB’s! All our server had
to offer. I also tried scripting a cleanup of the journals overflow files.
Which failed, because the journal keeps track of those files, and won’t restore
if some are missing. I’m thinking of building some nifi-utility functions for
doing emergency cleanup of the FlowFile repository where you can specify a
Queue ID and it removes those files, or maybe doing an offline compaction.
Thanks,
Peter
From: Brandon DeVries <[email protected]<mailto:[email protected]>>
Sent: Thursday, August 15, 2019 9:53 AM
To: [email protected]<mailto:[email protected]>
Subject: [EXT] Re: FlowFile Repository can't checkpoint, out of heap space.
Peter,
Unfortunately, I don't have a perfect solution for your current problem. I
would try starting with autoResume=false, just to try to limit what's going on
in the system. If possible, you can also try temporarily giving the JVM more
heap.
This is, however, the use case that led to the idea of "recovery mode" in the
new RocksDBFlowFileRepository[1] that should be in nifi 1.10.0 (the
documentation[2] is attached to the ticket):
"[Recovery mode] limits the number of FlowFiles loaded into the graph at a
time, while not actually removing any FlowFiles (or content) from the system.
This allows for the recovery of a system that is encountering OutOfMemory
errors or similar on startup..."
[1]
https://issues.apache.org/jira/browse/NIFI-4775<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FNIFI-4775&data=02%7C01%7Cpwicks%40micron.com%7Ce785de36aeeb49a54bf308d721a1d839%7Cf38a5ecd28134862b11bac1d563c806f%7C0%7C0%7C637014851336060085&sdata=BT%2FQoS0CeWySXE5VIJblhE%2BLaXW7ziR1rcfUlRQdnBc%3D&reserved=0>
[2]
https://issues.apache.org/jira/secure/attachment/12976954/RocksDBFlowFileRepo.html<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fsecure%2Fattachment%2F12976954%2FRocksDBFlowFileRepo.html&data=02%7C01%7Cpwicks%40micron.com%7Ce785de36aeeb49a54bf308d721a1d839%7Cf38a5ecd28134862b11bac1d563c806f%7C0%7C0%7C637014851336070081&sdata=GMs8yj24VVSL0Igk8wYkYvLlx0i6wtXVI8xRU3VsL0Y%3D&reserved=0>
On Wed, Aug 14, 2019 at 12:12 PM Peter Wicks (pwicks)
<[email protected]<mailto:[email protected]>> wrote:
I have a node in a cluster whose FlowFile repository grew so fast that it
exceeded the amount of available heap space and now can't checkpoint. Or that
is my interpretation of the error.
"Cannot update journal file flowfile_repository/journals/####.journal because
this journal has already encountered a failure when attempting to write to the
file."
Additionally, on restart, we see NiFi failed to restart because it ran out of
heap space while doing a SchemaRecordReader.readFieldValue. Feeling a bit
stuck on where to go from here.
Based on metrics we collect, we see a large increase in FlowFile's on that node
right before it crashed, and in linux we see the following:
94G ./journals/overflow-569618072
356G ./journals/overflow-569892338
Oh, and a 280 GB checkpoint file
There are a few queues/known FlowFile’s that are probably the problem, and I’m
OK with dropping them, but there is plenty of other data in there too that I
don’t want to lose…
Thanks,
Peter