This is an automated email from the ASF dual-hosted git repository. bbende pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new fd068fe NIFI-7557: uses a canonical representation of strings when recovering data from FlowFile Repository in order to avoid using huge amounts of heap when not necessary - Fixed some problems with unit/integration tests fd068fe is described below commit fd068fe978da5a4a3e5df14b9a8e43f51e88c606 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Thu Jun 18 13:25:08 2020 -0400 NIFI-7557: uses a canonical representation of strings when recovering data from FlowFile Repository in order to avoid using huge amounts of heap when not necessary - Fixed some problems with unit/integration tests This closes #4507. Signed-off-by: Bryan Bende <bbe...@apache.org> --- .../apache/nifi/repository/schema/FieldCache.java | 55 ++++++++++++++ .../nifi/repository/schema/NoOpFieldCache.java | 29 +++++++ .../nifi/repository/schema/SchemaRecordReader.java | 14 ++-- .../repository/schema/TestSchemaRecordReader.java | 4 +- .../schema/TestSchemaRecordReaderWriter.java | 4 +- .../wali/.SequentialAccessWriteAheadLog.java.swp | Bin 16384 -> 0 bytes .../nifi-flowfile-repo-serialization/pom.xml | 32 ++++---- .../controller/repository/CaffeineFieldCache.java | 43 +++++++++++ .../EncryptedRepositoryRecordSerdeFactory.java | 7 +- .../repository/SchemaRepositoryRecordSerde.java | 7 +- .../StandardRepositoryRecordSerdeFactory.java | 10 ++- .../nifi/controller/FileSystemSwapManager.java | 6 +- .../org/apache/nifi/controller/FlowController.java | 83 +++++++++------------ .../repository/WriteAheadFlowFileRepository.java | 39 ++++++++-- .../controller/swap/SchemaSwapDeserializer.java | 15 +++- .../apache/nifi/encrypt/StringEncryptorIT.groovy | 12 --- ...cryptedSequentialAccessWriteAheadLogTest.groovy | 5 +- .../SchemaRepositoryRecordSerdeTest.java | 3 +- .../OOMEWriteAheadFlowFileRepository.java | 5 +- .../provenance/ByteArraySchemaRecordReader.java | 3 +- .../provenance/EventIdFirstSchemaRecordReader.java | 5 +- 21 files changed, 275 insertions(+), 106 deletions(-) diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldCache.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldCache.java new file mode 100644 index 0000000..9186a51 --- /dev/null +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldCache.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.repository.schema; + +/** + * <p> + * For many write-ahead logs, the keys and values that are stored for fields are very repetitive. As a result, it is common when reading data from the repository + * to read the same value over and over, but each time the value is read, the data is a separate object in memory. + * </p> + * + * <p> + * Take, for example, the case when the value "Hello World" is stored as a field in nearly every record that is written to the WAL. When the record is created, + * it may be created in such a way that a single String is referenced over and over again. However, when the Write-Ahead Log is restored, each time that value is encountered, + * a new String must be created because it is being deserialized from an InputStream. So instead of a single String occupying approximately 25 bytes of heap, if this is encountered + * 1 million times, the result is that 1 million 25-byte Strings remain on the heap, totaling about 25 MB of heap space. + * </p> + * + * <p> + * In order to avoid this, a FieldValueCache can be provided to the SerDe. As a result, whenever a value is read, that value is added to a cache as the "canonical representation" of that + * value. The next time that value is encountered, if the first instance is still available in the cache, the canonical representation will be returned. As a result, we end up creating the + * first String with the value of "Hello World" and then the second instance. The second instance is then used to lookup the canonical representation (the first instance) and the canonical + * representation is then included in the record. The second instance is then garbage collected. As a result, even with millions of records having the value "Hello World" only a single + * instance needs to be kept in heap. + * </p> + */ +public interface FieldCache { + + /** + * Check if the given value already exists in the cache and if so returns it. If the value does not + * already exist in the cache, adds the given value to the cache, evicting an existing entry(ies) if necessary + * @param value the value to cache + * @return the canonical representation of the value that should be used + */ + String cache(String value); + + /** + * Clears the cache + */ + void clear(); +} diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/NoOpFieldCache.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/NoOpFieldCache.java new file mode 100644 index 0000000..662f8ca --- /dev/null +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/NoOpFieldCache.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.repository.schema; + +public class NoOpFieldCache implements FieldCache { + @Override + public String cache(final String value) { + return value; + } + + @Override + public void clear() { + } +} diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java index daedf37..68f116b 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java @@ -35,13 +35,15 @@ import java.util.Optional; public class SchemaRecordReader { private final RecordSchema schema; + private final FieldCache fieldCache; - public SchemaRecordReader(final RecordSchema schema) { + private SchemaRecordReader(final RecordSchema schema, final FieldCache fieldCache) { this.schema = schema; + this.fieldCache = fieldCache; } - public static SchemaRecordReader fromSchema(final RecordSchema schema) { - return new SchemaRecordReader(schema); + public static SchemaRecordReader fromSchema(final RecordSchema schema, final FieldCache fieldCache) { + return new SchemaRecordReader(schema, fieldCache); } private static void fillBuffer(final InputStream in, final byte[] destination) throws IOException { @@ -194,13 +196,15 @@ public class SchemaRecordReader { } case STRING: { final DataInputStream dis = new DataInputStream(in); - return dis.readUTF(); + final String value = dis.readUTF(); + return fieldCache.cache(value); } case LONG_STRING: { final int length = readInt(in); final byte[] buffer = new byte[length]; fillBuffer(in, buffer); - return new String(buffer, StandardCharsets.UTF_8); + final String value = new String(buffer, StandardCharsets.UTF_8); + return fieldCache.cache(value); } case BYTE_ARRAY: { final int length = readInt(in); diff --git a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java index 6099814..d5a4cfd 100644 --- a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java +++ b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java @@ -63,7 +63,7 @@ public class TestSchemaRecordReader { }))); final RecordSchema schema = new RecordSchema(fields); - final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema); + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache()); final byte[] buffer; try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -167,7 +167,7 @@ public class TestSchemaRecordReader { final RecordSchema schema = new RecordSchema(fields); - final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema); + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache()); // for each field, make the first one missing and the second one present. final byte[] buffer; diff --git a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java index 5dfd40e..5dc5cd8 100644 --- a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java +++ b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java @@ -142,7 +142,7 @@ public class TestSchemaRecordReaderWriter { try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) { // Read the Schema from the stream and create a Record Reader for reading records, based on this schema final RecordSchema readSchema = RecordSchema.readFrom(in); - final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema); + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema, new NoOpFieldCache()); // Read two records and verify the values. for (int i=0; i < 2; i++) { @@ -216,7 +216,7 @@ public class TestSchemaRecordReaderWriter { try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) { // Read the Schema from the stream and create a Record Reader for reading records, based on this schema final RecordSchema readSchema = RecordSchema.readFrom(in); - final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema); + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema, new NoOpFieldCache()); // Read the records and verify the values. for (int i=0; i < 2; i++) { diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp deleted file mode 100644 index 1d5f641..0000000 Binary files a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp and /dev/null differ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml index bafa8dd..6a25488 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml @@ -1,13 +1,13 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -47,10 +47,16 @@ <artifactId>nifi-security-utils</artifactId> </dependency> <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.8.1</version> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties-loader</artifactId> + </dependency> + + <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk15on</artifactId> <scope>test</scope> @@ -61,9 +67,5 @@ <version>${nifi.groovy.version}</version> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties-loader</artifactId> - </dependency> </dependencies> </project> diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/CaffeineFieldCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/CaffeineFieldCache.java new file mode 100644 index 0000000..22874da --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/CaffeineFieldCache.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.repository.schema.FieldCache; + +public class CaffeineFieldCache implements FieldCache { + private final Cache<String, String> cache; + + public CaffeineFieldCache(final long maxCharacters) { + cache = Caffeine.newBuilder() + .maximumWeight(maxCharacters) + .weigher((k, v) -> ((String) k).length()) + .build(); + } + + @Override + public String cache(final String value) { + return cache.get(value, k -> value); + } + + @Override + public void clear() { + cache.invalidateAll(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java index 5b28177..714fc7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository; import java.io.IOException; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.repository.schema.FieldCache; import org.apache.nifi.security.kms.CryptoUtils; import org.apache.nifi.security.kms.EncryptionException; import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration; @@ -32,8 +33,8 @@ public class EncryptedRepositoryRecordSerdeFactory extends StandardRepositoryRec private FlowFileRepositoryEncryptionConfiguration ffrec; - public EncryptedRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, NiFiProperties niFiProperties) throws EncryptionException { - super(claimManager); + public EncryptedRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, final NiFiProperties niFiProperties, final FieldCache fieldCache) throws EncryptionException { + super(claimManager, fieldCache); // Retrieve encryption configuration FlowFileRepositoryEncryptionConfiguration ffrec = new FlowFileRepositoryEncryptionConfiguration(niFiProperties); @@ -48,7 +49,7 @@ public class EncryptedRepositoryRecordSerdeFactory extends StandardRepositoryRec } @Override - public SerDe<SerializedRepositoryRecord> createSerDe(String encodingName) { + public SerDe<SerializedRepositoryRecord> createSerDe(final String encodingName) { // If no encoding is provided, use the encrypted as the default if (encodingName == null || EncryptedSchemaRepositoryRecordSerde.class.getName().equals(encodingName)) { // Delegate the creation of the wrapped serde to the standard factory diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java index ee87f1b..12933f9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.repository.schema.FlowFileSchema; import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap; import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate; +import org.apache.nifi.repository.schema.FieldCache; import org.apache.nifi.repository.schema.FieldType; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordIterator; @@ -49,11 +50,13 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1; private final ResourceClaimManager resourceClaimManager; + private final FieldCache fieldCache; private volatile SchemaRecordReader reader; private RecordIterator recordIterator = null; - public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) { + public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager, final FieldCache fieldCache) { this.resourceClaimManager = resourceClaimManager; + this.fieldCache = fieldCache; } @Override @@ -101,7 +104,7 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement @Override public void readHeader(final DataInputStream in) throws IOException { final RecordSchema recoverySchema = RecordSchema.readFrom(in); - reader = SchemaRecordReader.fromSchema(recoverySchema); + reader = SchemaRecordReader.fromSchema(recoverySchema, fieldCache); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java index ac1989c..884105a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java @@ -18,21 +18,29 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.repository.schema.FieldCache; +import org.apache.nifi.repository.schema.NoOpFieldCache; import org.wali.SerDe; import org.wali.UpdateType; public class StandardRepositoryRecordSerdeFactory implements RepositoryRecordSerdeFactory { private static final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde"; private final ResourceClaimManager resourceClaimManager; + private final FieldCache fieldCache; public StandardRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) { + this(claimManager, new NoOpFieldCache()); + } + + public StandardRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, final FieldCache fieldCache) { this.resourceClaimManager = claimManager; + this.fieldCache = fieldCache; } @Override public SerDe<SerializedRepositoryRecord> createSerDe(final String encodingName) { if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) { - final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager); + final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager, fieldCache); return serde; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 01cd272..8f0caa8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.repository.CaffeineFieldCache; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -35,6 +36,7 @@ import org.apache.nifi.controller.swap.SwapDeserializer; import org.apache.nifi.controller.swap.SwapSerializer; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; +import org.apache.nifi.repository.schema.FieldCache; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; @@ -78,11 +80,11 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap"); private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part"); - public static final int SWAP_ENCODING_VERSION = 10; public static final String EVENT_CATEGORY = "Swap FlowFiles"; private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); private final File storageDirectory; + private final FieldCache fieldCache = new CaffeineFieldCache(10_000_000); // effectively final private FlowFileRepository flowFileRepository; @@ -372,7 +374,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { if (Arrays.equals(magicHeader, MAGIC_HEADER)) { final String serializationName = dis.readUTF(); if (serializationName.equals(SchemaSwapDeserializer.getSerializationName())) { - return new SchemaSwapDeserializer(); + return new SchemaSwapDeserializer(fieldCache); } throw new IOException("Cannot find a suitable Deserializer for swap file, written with Serialization Name '" + serializationName + "'"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 500cb15..130a476 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,38 +16,6 @@ */ package org.apache.nifi.controller; -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; -import javax.management.NotificationEmitter; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -114,7 +82,6 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.CounterRepository; -import org.apache.nifi.controller.repository.EncryptedRepositoryRecordSerdeFactory; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; @@ -127,7 +94,6 @@ import org.apache.nifi.controller.repository.StandardQueueProvider; import org.apache.nifi.controller.repository.StandardRepositoryRecord; import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.SwapSummary; -import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -218,13 +184,45 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.concurrency.TimedLock; -import org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.NotificationEmitter; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider { // default repository implementations @@ -787,18 +785,11 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } try { - final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, - FlowFileRepository.class, properties); - if (EncryptedSequentialAccessWriteAheadLog.class.getName().equals(properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION)) - && created instanceof WriteAheadFlowFileRepository) { - synchronized (created) { - ((WriteAheadFlowFileRepository) created).initialize(contentClaimManager, new EncryptedRepositoryRecordSerdeFactory(contentClaimManager, properties)); - } - } else { - synchronized (created) { - created.initialize(contentClaimManager); - } + final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileRepository.class, properties); + synchronized (created) { + created.initialize(contentClaimManager); } + return created; } catch (final Exception e) { throw new RuntimeException(e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 432e4e4..f069de6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -22,8 +22,11 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.repository.schema.FieldCache; +import org.apache.nifi.security.kms.EncryptionException; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog; import org.apache.nifi.wali.SequentialAccessWriteAheadLog; import org.apache.nifi.wali.SnapshotCapture; import org.slf4j.Logger; @@ -86,26 +89,29 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory"; private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation"; private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles"; + private static final String FLOWFILE_REPO_CACHE_SIZE = "nifi.flowfile.repository.wal.cache.characters"; static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog"; static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog"; private static final String MINIMAL_LOCKING_WALI = "org.wali.MinimalLockingWriteAheadLog"; private static final String DEFAULT_WAL_IMPLEMENTATION = SEQUENTIAL_ACCESS_WAL; + private static final int DEFAULT_CACHE_SIZE = 10_000_000; - final String walImplementation; + private final String walImplementation; protected final NiFiProperties nifiProperties; - final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L); + private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L); private final boolean alwaysSync; private final boolean retainOrphanedFlowFiles; private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class); volatile ScheduledFuture<?> checkpointFuture; - final long checkpointDelayMillis; + private final long checkpointDelayMillis; private final List<File> flowFileRepositoryPaths = new ArrayList<>(); - final List<File> recoveryFiles = new ArrayList<>(); - final ScheduledExecutorService checkpointExecutor; + private final List<File> recoveryFiles = new ArrayList<>(); + private final ScheduledExecutorService checkpointExecutor; + private final int maxCharactersToCache; private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null; private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>()); @@ -116,6 +122,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private WriteAheadRepository<SerializedRepositoryRecord> wal; private RepositoryRecordSerdeFactory serdeFactory; private ResourceClaimManager claimManager; + private FieldCache fieldCache; // WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk. // We keep track of this because we need to ensure that the ContentClaims are destroyed only after the FlowFile Repository has been @@ -150,6 +157,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis walImplementation = null; nifiProperties = null; retainOrphanedFlowFiles = true; + maxCharactersToCache = 0; } public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { @@ -165,6 +173,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis writeAheadLogImpl = DEFAULT_WAL_IMPLEMENTATION; } this.walImplementation = writeAheadLogImpl; + this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE); // We used to use one implementation (minimal locking) of the write-ahead log, but we now want to use the other // (sequential access), we must address this. Since the MinimalLockingWriteAheadLog supports multiple partitions, @@ -202,11 +211,25 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public void initialize(final ResourceClaimManager claimManager) throws IOException { - initialize(claimManager, new StandardRepositoryRecordSerdeFactory(claimManager)); + final FieldCache fieldCache = new CaffeineFieldCache(maxCharactersToCache); + initialize(claimManager, createSerdeFactory(claimManager, fieldCache), fieldCache); } - public void initialize(final ResourceClaimManager claimManager, final RepositoryRecordSerdeFactory serdeFactory) throws IOException { + protected RepositoryRecordSerdeFactory createSerdeFactory(final ResourceClaimManager claimManager, final FieldCache fieldCache) { + if (EncryptedSequentialAccessWriteAheadLog.class.getName().equals(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION))) { + try { + return new EncryptedRepositoryRecordSerdeFactory(claimManager, nifiProperties, fieldCache); + } catch (final EncryptionException e) { + throw new RuntimeException(e); + } + } else { + return new StandardRepositoryRecordSerdeFactory(claimManager, fieldCache); + } + } + + public void initialize(final ResourceClaimManager claimManager, final RepositoryRecordSerdeFactory serdeFactory, final FieldCache fieldCache) throws IOException { this.claimManager = claimManager; + this.fieldCache = fieldCache; for (final File file : flowFileRepositoryPaths) { Files.createDirectories(file.toPath()); @@ -868,6 +891,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } } + fieldCache.clear(); + final Map<String, FlowFileQueue> queueMap = new HashMap<>(); for (final FlowFileQueue queue : queueProvider.getAllQueues()) { queueMap.put(queue.getIdentifier(), queue); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java index 88e1415..f184a66 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java @@ -30,6 +30,8 @@ import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.schema.FlowFileRecordFieldMap; import org.apache.nifi.repository.schema.ComplexRecordField; +import org.apache.nifi.repository.schema.FieldCache; +import org.apache.nifi.repository.schema.NoOpFieldCache; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordField; import org.apache.nifi.repository.schema.RecordSchema; @@ -37,12 +39,21 @@ import org.apache.nifi.repository.schema.Repetition; import org.apache.nifi.repository.schema.SchemaRecordReader; public class SchemaSwapDeserializer implements SwapDeserializer { + private final FieldCache fieldCache; + + public SchemaSwapDeserializer() { + this(new NoOpFieldCache()); + } + + public SchemaSwapDeserializer(final FieldCache fieldCache) { + this.fieldCache = fieldCache; + } @Override @SuppressWarnings("unchecked") public SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { final RecordSchema schema = RecordSchema.readFrom(in); - final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema); + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, fieldCache); final Record parentRecord = reader.readRecord(in); final List<Record> flowFileRecords = (List<Record>) parentRecord.getFieldValue(SwapSchema.FLOWFILE_CONTENTS); @@ -65,7 +76,7 @@ public class SchemaSwapDeserializer implements SwapDeserializer { final RecordField summaryRecordField = new ComplexRecordField(SwapSchema.SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields); final RecordSchema summarySchema = new RecordSchema(Collections.singletonList(summaryRecordField)); - final Record summaryRecordParent = SchemaRecordReader.fromSchema(summarySchema).readRecord(in); + final Record summaryRecordParent = SchemaRecordReader.fromSchema(summarySchema, fieldCache).readRecord(in); final Record summaryRecord = (Record) summaryRecordParent.getFieldValue(SwapSchema.SWAP_SUMMARY); final SwapSummary swapSummary = SwapSummaryFieldMap.getSwapSummary(summaryRecord, claimManager); return swapSummary; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy index 8e4428c..b5477da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy @@ -71,9 +71,6 @@ class StringEncryptorIT { final String PASSWORD = "nifiPassword123" final String plaintext = "some sensitive flow value" - final long SLOW_DURATION_NANOS = 500 * 1000 * 1000 // 500 ms - final long FAST_DURATION_NANOS = 1 * 1000 * 1000 // 1 ms - int testIterations = 100 //_000 def results = [] @@ -111,14 +108,5 @@ class StringEncryptorIT { def milliDurations = [resultDurations.min(), resultDurations.max(), resultDurations.sum() / resultDurations.size()].collect { it / 1_000_000 } logger.info("Min/Max/Avg durations in ms: ${milliDurations}") - - // Assert - - // The initial creation (including key derivation) should be slow - assert createNanos > SLOW_DURATION_NANOS - - // The encryption/decryption process (repeated) should be fast - assert resultDurations.max() <= FAST_DURATION_NANOS * 3 - assert resultDurations.sum() / testIterations < FAST_DURATION_NANOS } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy index 8bc91db..03faec6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy @@ -31,6 +31,7 @@ import org.apache.nifi.controller.repository.StandardRepositoryRecord import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory import org.apache.nifi.controller.repository.claim.ResourceClaimManager import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager +import org.apache.nifi.repository.schema.NoOpFieldCache import org.apache.nifi.security.kms.CryptoUtils import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration import org.bouncycastle.jce.provider.BouncyCastleProvider @@ -105,7 +106,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { flowFileQueue = createAndRegisterMockQueue(TEST_QUEUE_IDENTIFIER) byteArrayOutputStream = new ByteArrayOutputStream() dataOutputStream = new DataOutputStream(byteArrayOutputStream) - wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager) + wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager, new NoOpFieldCache()) flowFileREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, KEYS, REPO_IMPL) @@ -192,7 +193,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { testLogger.setLevel(Level.INFO) final List<SerializedRepositoryRecord> records = new ArrayList<>() - 100_000.times { int i -> + 10_000.times { int i -> def attributes = [name: "User ${i}" as String, age: "${i}" as String] final SerializedRepositoryRecord record = buildCreateRecord(flowFileQueue, attributes) records.add(record) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java index c306db1..543d94b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java @@ -20,6 +20,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.apache.nifi.repository.schema.NoOpFieldCache; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,7 +50,7 @@ public class SchemaRepositoryRecordSerdeTest { @Before public void setup() { resourceClaimManager = new StandardResourceClaimManager(); - schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager); + schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager, new NoOpFieldCache()); flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER); byteArrayOutputStream = new ByteArrayOutputStream(); dataOutputStream = new DataOutputStream(byteArrayOutputStream); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java index 320db25..2b55700 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java @@ -21,6 +21,8 @@ import org.apache.nifi.controller.repository.SerializedRepositoryRecord; import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory; import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.repository.schema.FieldCache; +import org.apache.nifi.repository.schema.NoOpFieldCache; import org.apache.nifi.util.NiFiProperties; import org.wali.SerDe; import org.wali.UpdateType; @@ -41,7 +43,8 @@ public class OOMEWriteAheadFlowFileRepository extends WriteAheadFlowFileReposito @Override public void initialize(final ResourceClaimManager claimManager) throws IOException { - super.initialize(claimManager, new ThrowOOMERepositoryRecordSerdeFactory(new StandardRepositoryRecordSerdeFactory(claimManager))); + final FieldCache fieldCache = new NoOpFieldCache(); + super.initialize(claimManager, new ThrowOOMERepositoryRecordSerdeFactory(new StandardRepositoryRecordSerdeFactory(claimManager, fieldCache)), fieldCache); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java index bb43ba8..d1e5b3c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java @@ -25,6 +25,7 @@ import java.io.InputStream; import org.apache.nifi.provenance.schema.EventRecord; import org.apache.nifi.provenance.serialization.CompressableRecordReader; import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.repository.schema.NoOpFieldCache; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; import org.apache.nifi.repository.schema.SchemaRecordReader; @@ -62,7 +63,7 @@ public class ByteArraySchemaRecordReader extends CompressableRecordReader { schema = RecordSchema.readFrom(bais); } - recordReader = SchemaRecordReader.fromSchema(schema); + recordReader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache()); } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java index bd85846..797402e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java @@ -27,6 +27,7 @@ import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.serialization.CompressableRecordReader; import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.repository.schema.NoOpFieldCache; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; import org.apache.nifi.repository.schema.SchemaRecordReader; @@ -100,7 +101,7 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { schema = RecordSchema.readFrom(bais); } - recordReader = SchemaRecordReader.fromSchema(schema); + recordReader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache()); final int headerSchemaLength = in.readInt(); final byte[] headerSchemaBuffer = new byte[headerSchemaLength]; @@ -111,7 +112,7 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { headerSchema = RecordSchema.readFrom(bais); } - final SchemaRecordReader headerReader = SchemaRecordReader.fromSchema(headerSchema); + final SchemaRecordReader headerReader = SchemaRecordReader.fromSchema(headerSchema, new NoOpFieldCache()); final Record headerRecord = headerReader.readRecord(in); componentIds = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_IDS); componentTypes = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_TYPES);