Repository: nifi Updated Branches: refs/heads/master 14fef2de1 -> 0bcb241db
http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java new file mode 100644 index 0000000..2492283 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.junit.Test; + +public class TestBlockingQueuePool { + private static final Consumer<AtomicBoolean> DO_NOTHING = ab -> {}; + + @Test + public void testReuse() { + final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + final AtomicBoolean firstObject = pool.borrowObject(); + firstObject.set(true); + pool.returnObject(firstObject); + + for (int i = 0; i < 100; i++) { + final AtomicBoolean value = pool.borrowObject(); + assertSame(firstObject, value); + pool.returnObject(value); + } + } + + @Test + public void testCreateOnExhaustion() { + final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + final AtomicBoolean firstObject = pool.borrowObject(); + final AtomicBoolean secondObject = pool.borrowObject(); + + assertNotSame(firstObject, secondObject); + } + + @Test + public void testCreateMoreThanMaxCapacity() { + final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + for (int i = 0; i < 50; i++) { + final AtomicBoolean value = pool.borrowObject(); + assertNotNull(value); + } + } + + @Test + public void testDoesNotBufferMoreThanCapacity() { + final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + final AtomicBoolean[] seen = new AtomicBoolean[50]; + for (int i = 0; i < 50; i++) { + final AtomicBoolean value = pool.borrowObject(); + assertNotNull(value); + value.set(true); + seen[i] = value; + } + + for (final AtomicBoolean value : seen) { + pool.returnObject(value); + } + + for (int i = 0; i < 10; i++) { + final AtomicBoolean value = pool.borrowObject(); + + // verify that the object exists in the 'seen' array + boolean found = false; + for (final AtomicBoolean seenBoolean : seen) { + if (value == seenBoolean) { + found = true; + break; + } + } + + assertTrue(found); + } + + for (int i = 0; i < 40; i++) { + final AtomicBoolean value = pool.borrowObject(); + + // verify that the object does not exist in the 'seen' array + boolean found = false; + for (final AtomicBoolean seenBoolean : seen) { + if (value == seenBoolean) { + found = true; + break; + } + } + + assertFalse(found); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java new file mode 100644 index 0000000..692500e --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; + +public class TestHashMapSnapshot { + + private final File storageDirectory = new File("target/test-hashmap-snapshot"); + private DummyRecordSerde serde; + private SerDeFactory<DummyRecord> serdeFactory; + + @Before + public void setup() throws IOException { + if (!storageDirectory.exists()) { + Files.createDirectories(storageDirectory.toPath()); + } + + final File[] childFiles = storageDirectory.listFiles(); + for (final File childFile : childFiles) { + if (childFile.isFile()) { + Files.delete(childFile.toPath()); + } + } + + serde = new DummyRecordSerde(); + serdeFactory = new SingletonSerDeFactory<>(serde); + + } + + @Test + public void testSuccessfulRoundTrip() throws IOException { + final HashMapSnapshot<DummyRecord> snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + final Map<String, String> props = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + props.put("key", String.valueOf(i)); + record.setProperties(props); + snapshot.update(Collections.singleton(record)); + } + + for (int i = 2; i < 10; i += 2) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.DELETE); + snapshot.update(Collections.singleton(record)); + } + + for (int i = 1; i < 10; i += 2) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.SWAP_OUT); + record.setSwapLocation("swapFile-" + i); + snapshot.update(Collections.singleton(record)); + } + + final DummyRecord swapIn7 = new DummyRecord("7", UpdateType.SWAP_IN); + swapIn7.setSwapLocation("swapFile-7"); + snapshot.update(Collections.singleton(swapIn7)); + + final Set<String> swappedOutLocations = new HashSet<>(); + swappedOutLocations.add("swapFile-1"); + swappedOutLocations.add("swapFile-3"); + swappedOutLocations.add("swapFile-5"); + swappedOutLocations.add("swapFile-9"); + + final SnapshotCapture<DummyRecord> capture = snapshot.prepareSnapshot(180L); + assertEquals(180L, capture.getMaxTransactionId()); + assertEquals(swappedOutLocations, capture.getSwapLocations()); + + final Map<Object, DummyRecord> records = capture.getRecords(); + assertEquals(2, records.size()); + assertTrue(records.containsKey("0")); + assertTrue(records.containsKey("7")); + + snapshot.writeSnapshot(capture); + + final SnapshotRecovery<DummyRecord> recovery = snapshot.recover(); + assertEquals(180L, recovery.getMaxTransactionId()); + assertEquals(swappedOutLocations, recovery.getRecoveredSwapLocations()); + + final Map<Object, DummyRecord> recoveredRecords = recovery.getRecords(); + assertEquals(records, recoveredRecords); + } + + @Test + public void testOOMEWhenWritingResultsInPreviousSnapshotStillRecoverable() throws IOException { + final HashMapSnapshot<DummyRecord> snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + final Map<String, String> props = new HashMap<>(); + + for (int i = 0; i < 11; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + props.put("key", String.valueOf(i)); + record.setProperties(props); + snapshot.update(Collections.singleton(record)); + } + + final DummyRecord swapOutRecord = new DummyRecord("10", UpdateType.SWAP_OUT); + swapOutRecord.setSwapLocation("SwapLocation-1"); + snapshot.update(Collections.singleton(swapOutRecord)); + + snapshot.writeSnapshot(snapshot.prepareSnapshot(25L)); + + serde.setThrowOOMEAfterNSerializeEdits(3); + + try { + snapshot.writeSnapshot(snapshot.prepareSnapshot(150L)); + Assert.fail("Expected OOME"); + } catch (final OutOfMemoryError oome) { + // expected + } + + final SnapshotRecovery<DummyRecord> recovery = snapshot.recover(); + assertEquals(25L, recovery.getMaxTransactionId()); + + final Map<Object, DummyRecord> recordMap = recovery.getRecords(); + assertEquals(10, recordMap.size()); + for (int i = 0; i < 10; i++) { + assertTrue(recordMap.containsKey(String.valueOf(i))); + } + for (final Map.Entry<Object, DummyRecord> entry : recordMap.entrySet()) { + final DummyRecord record = entry.getValue(); + final Map<String, String> properties = record.getProperties(); + assertNotNull(properties); + assertEquals(1, properties.size()); + assertEquals(entry.getKey(), properties.get("key")); + } + + final Set<String> swapLocations = recovery.getRecoveredSwapLocations(); + assertEquals(1, swapLocations.size()); + assertTrue(swapLocations.contains("SwapLocation-1")); + } + + @Test + public void testIOExceptionWhenWritingResultsInPreviousSnapshotStillRecoverable() throws IOException { + final HashMapSnapshot<DummyRecord> snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + final Map<String, String> props = new HashMap<>(); + + for (int i = 0; i < 11; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + props.put("key", String.valueOf(i)); + record.setProperties(props); + snapshot.update(Collections.singleton(record)); + } + + final DummyRecord swapOutRecord = new DummyRecord("10", UpdateType.SWAP_OUT); + swapOutRecord.setSwapLocation("SwapLocation-1"); + snapshot.update(Collections.singleton(swapOutRecord)); + + snapshot.writeSnapshot(snapshot.prepareSnapshot(25L)); + + serde.setThrowIOEAfterNSerializeEdits(3); + + for (int i = 0; i < 5; i++) { + try { + snapshot.writeSnapshot(snapshot.prepareSnapshot(150L)); + Assert.fail("Expected IOE"); + } catch (final IOException ioe) { + // expected + } + } + + final SnapshotRecovery<DummyRecord> recovery = snapshot.recover(); + assertEquals(25L, recovery.getMaxTransactionId()); + + final Map<Object, DummyRecord> recordMap = recovery.getRecords(); + assertEquals(10, recordMap.size()); + for (int i = 0; i < 10; i++) { + assertTrue(recordMap.containsKey(String.valueOf(i))); + } + for (final Map.Entry<Object, DummyRecord> entry : recordMap.entrySet()) { + final DummyRecord record = entry.getValue(); + final Map<String, String> properties = record.getProperties(); + assertNotNull(properties); + assertEquals(1, properties.size()); + assertEquals(entry.getKey(), properties.get("key")); + } + + final Set<String> swapLocations = recovery.getRecoveredSwapLocations(); + assertEquals(1, swapLocations.size()); + assertTrue(swapLocations.contains("SwapLocation-1")); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java new file mode 100644 index 0000000..94df890 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; + +public class TestLengthDelimitedJournal { + private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal"); + private SerDeFactory<DummyRecord> serdeFactory; + private DummyRecordSerde serde; + private ObjectPool<ByteArrayDataOutputStream> streamPool; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setupJournal() throws IOException { + Files.deleteIfExists(journalFile.toPath()); + + if (!journalFile.getParentFile().exists()) { + Files.createDirectories(journalFile.getParentFile().toPath()); + } + + serde = new DummyRecordSerde(); + serdeFactory = new SingletonSerDeFactory<>(serde); + streamPool = new BlockingQueuePool<>(1, + () -> new ByteArrayDataOutputStream(BUFFER_SIZE), + stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE, + stream -> stream.getByteArrayOutputStream().reset()); + } + + @Test + public void testHandlingOfTrailingNulBytes() throws IOException { + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + final List<DummyRecord> firstTransaction = new ArrayList<>(); + firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); + + final List<DummyRecord> secondTransaction = new ArrayList<>(); + secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); + secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); + secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); + + final List<DummyRecord> thirdTransaction = new ArrayList<>(); + thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); + + journal.update(firstTransaction, id -> null); + journal.update(secondTransaction, id -> null); + journal.update(thirdTransaction, id -> null); + } + + // Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes, + // as this is what we often see when we have a sudden power loss. + final byte[] contents = Files.readAllBytes(journalFile.toPath()); + final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8); + final byte[] withNuls = new byte[truncated.length + 28]; + System.arraycopy(truncated, 0, withNuls, 0, truncated.length); + + try (final OutputStream fos = new FileOutputStream(journalFile)) { + fos.write(withNuls); + } + + + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final Map<Object, DummyRecord> recordMap = new HashMap<>(); + final Set<String> swapLocations = new HashSet<>(); + + journal.recoverRecords(recordMap, swapLocations); + + assertFalse(recordMap.isEmpty()); + assertEquals(3, recordMap.size()); + + final DummyRecord record1 = recordMap.get("1"); + assertNotNull(record1); + assertEquals(Collections.singletonMap("abc", "123"), record1.getProperties()); + + final DummyRecord record2 = recordMap.get("2"); + assertNotNull(record2); + assertEquals(Collections.singletonMap("cba", "123"), record2.getProperties()); + + final DummyRecord record3 = recordMap.get("3"); + assertNotNull(record3); + assertEquals(Collections.singletonMap("aaa", "123"), record3.getProperties()); + } + } + + @Test + public void testUpdateOnlyAppliedIfEntireTransactionApplied() throws IOException { + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + for (int i = 0; i < 3; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + journal.update(Collections.singleton(record), key -> null); + } + + final DummyRecord swapOut1Record = new DummyRecord("1", UpdateType.SWAP_OUT); + swapOut1Record.setSwapLocation("swap12"); + journal.update(Collections.singleton(swapOut1Record), id -> null); + + final DummyRecord swapOut2Record = new DummyRecord("2", UpdateType.SWAP_OUT); + swapOut2Record.setSwapLocation("swap12"); + journal.update(Collections.singleton(swapOut2Record), id -> null); + + final List<DummyRecord> records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord("1" + i, UpdateType.CREATE); + records.add(record); + } + + final DummyRecord swapIn1Record = new DummyRecord("1", UpdateType.SWAP_IN); + swapIn1Record.setSwapLocation("swap12"); + records.add(swapIn1Record); + + final DummyRecord swapOut1AgainRecord = new DummyRecord("1", UpdateType.SWAP_OUT); + swapOut1AgainRecord.setSwapLocation("swap12"); + records.add(swapOut1AgainRecord); + + final DummyRecord swapIn2Record = new DummyRecord("2", UpdateType.SWAP_IN); + swapIn2Record.setSwapLocation("swap12"); + records.add(swapIn2Record); + + final DummyRecord swapOut0Record = new DummyRecord("0", UpdateType.SWAP_OUT); + swapOut0Record.setSwapLocation("swap0"); + records.add(swapOut0Record); + + journal.update(records, id -> null); + } + + // Truncate the last 8 bytes so that we will get an EOFException when reading the last transaction. + try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) { + fos.getChannel().truncate(journalFile.length() - 8); + } + + + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final Map<Object, DummyRecord> recordMap = new HashMap<>(); + final Set<String> swapLocations = new HashSet<>(); + + final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations); + assertEquals(5L, recovery.getMaxTransactionId()); + assertEquals(5, recovery.getUpdateCount()); + + final Set<String> expectedSwap = Collections.singleton("swap12"); + assertEquals(expectedSwap, swapLocations); + + final Map<Object, DummyRecord> expectedRecordMap = new HashMap<>(); + expectedRecordMap.put("0", new DummyRecord("0", UpdateType.CREATE)); + assertEquals(expectedRecordMap, recordMap); + } + } + + @Test + public void testPoisonedJournalNotWritableAfterIOE() throws IOException { + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + serde.setThrowIOEAfterNSerializeEdits(2); + + final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + final RecordLookup<DummyRecord> lookup = key -> secondRecord; + try { + journal.update(Collections.singleton(thirdRecord), lookup); + Assert.fail("Expected IOException"); + } catch (final IOException ioe) { + // expected + } + + serde.setThrowIOEAfterNSerializeEdits(-1); + + final Collection<DummyRecord> records = Collections.singleton(thirdRecord); + for (int i = 0; i < 10; i++) { + try { + journal.update(records, lookup); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + + try { + journal.fsync(); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + } + } + } + + @Test + public void testPoisonedJournalNotWritableAfterOOME() throws IOException { + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + serde.setThrowOOMEAfterNSerializeEdits(2); + + final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + final RecordLookup<DummyRecord> lookup = key -> secondRecord; + try { + journal.update(Collections.singleton(thirdRecord), lookup); + Assert.fail("Expected OOME"); + } catch (final OutOfMemoryError oome) { + // expected + } + + serde.setThrowOOMEAfterNSerializeEdits(-1); + + final Collection<DummyRecord> records = Collections.singleton(thirdRecord); + for (int i = 0; i < 10; i++) { + try { + journal.update(records, lookup); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + + try { + journal.fsync(); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + } + } + } + + @Test + public void testSuccessfulRoundTrip() throws IOException { + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(thirdRecord), key -> secondRecord); + + final Map<Object, DummyRecord> recordMap = new HashMap<>(); + final Set<String> swapLocations = new HashSet<>(); + final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations); + assertFalse(recovery.isEOFExceptionEncountered()); + + assertEquals(2L, recovery.getMaxTransactionId()); + assertEquals(3, recovery.getUpdateCount()); + + assertTrue(swapLocations.isEmpty()); + assertEquals(1, recordMap.size()); + + final DummyRecord retrieved = recordMap.get("1"); + assertNotNull(retrieved); + assertEquals(thirdRecord, retrieved); + } + } + + @Test + public void testTruncatedJournalFile() throws IOException { + final DummyRecord firstRecord, secondRecord; + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + secondRecord = new DummyRecord("2", UpdateType.CREATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(thirdRecord), key -> secondRecord); + } + + // Truncate the file + try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) { + fos.getChannel().truncate(journalFile.length() - 8); + } + + // Ensure that we are able to recover the first two records without an issue but the third is lost. + try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final Map<Object, DummyRecord> recordMap = new HashMap<>(); + final Set<String> swapLocations = new HashSet<>(); + final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations); + assertTrue(recovery.isEOFExceptionEncountered()); + + assertEquals(2L, recovery.getMaxTransactionId()); // transaction ID is still 2 because that's what was written to the journal + assertEquals(2, recovery.getUpdateCount()); // only 2 updates because the last update will incur an EOFException and be skipped + + assertTrue(swapLocations.isEmpty()); + assertEquals(2, recordMap.size()); + + final DummyRecord retrieved1 = recordMap.get("1"); + assertNotNull(retrieved1); + assertEquals(firstRecord, retrieved1); + + final DummyRecord retrieved2 = recordMap.get("2"); + assertNotNull(retrieved2); + assertEquals(secondRecord, retrieved2); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java new file mode 100644 index 0000000..4fc0fe7 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +public class TestSequentialAccessWriteAheadLog { + @Rule + public TestName testName = new TestName(); + + @Test + public void testRecoverWithNoCheckpoint() throws IOException { + final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(); + + final List<DummyRecord> records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + records.add(record); + } + + repo.update(records, false); + repo.shutdown(); + + final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo(); + final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords(); + + // ensure that we get the same records back, but the order may be different, so wrap both collections + // in a HashSet so that we can compare unordered collections of the same type. + assertEquals(new HashSet<>(records), new HashSet<>(recovered)); + } + + @Test + public void testRecoverWithNoJournalUpdates() throws IOException { + final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(); + + final List<DummyRecord> records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + records.add(record); + } + + repo.update(records, false); + repo.checkpoint(); + repo.shutdown(); + + final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo(); + final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords(); + + // ensure that we get the same records back, but the order may be different, so wrap both collections + // in a HashSet so that we can compare unordered collections of the same type. + assertEquals(new HashSet<>(records), new HashSet<>(recovered)); + } + + @Test + public void testRecoverWithMultipleCheckpointsBetweenJournalUpdate() throws IOException { + final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(); + + final List<DummyRecord> records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + records.add(record); + } + + repo.update(records, false); + + for (int i = 0; i < 8; i++) { + repo.checkpoint(); + } + + final DummyRecord updateRecord = new DummyRecord("4", UpdateType.UPDATE); + updateRecord.setProperties(Collections.singletonMap("updated", "true")); + repo.update(Collections.singleton(updateRecord), false); + + repo.shutdown(); + + final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo(); + final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords(); + + // what we expect is the same as what we updated with, except we don't want the DummyRecord for CREATE 4 + // because we will instead recover an UPDATE only for 4. + final Set<DummyRecord> expected = new HashSet<>(records); + expected.remove(new DummyRecord("4", UpdateType.CREATE)); + expected.add(updateRecord); + + // ensure that we get the same records back, but the order may be different, so wrap both collections + // in a HashSet so that we can compare unordered collections of the same type. + assertEquals(expected, new HashSet<>(recovered)); + } + + private SequentialAccessWriteAheadLog<DummyRecord> createRecoveryRepo() throws IOException { + final File targetDir = new File("target"); + final File storageDir = new File(targetDir, testName.getMethodName()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde); + final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory); + + return repo; + } + + private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo() throws IOException { + final File targetDir = new File("target"); + final File storageDir = new File(targetDir, testName.getMethodName()); + deleteRecursively(storageDir); + assertTrue(storageDir.mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde); + final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory); + + final Collection<DummyRecord> recovered = repo.recoverRecords(); + assertNotNull(recovered); + assertTrue(recovered.isEmpty()); + + return repo; + } + + /** + * This test is designed to update the repository in several different wants, testing CREATE, UPDATE, SWAP IN, SWAP OUT, and DELETE + * update types, as well as testing updates with single records and with multiple records in a transaction. It also verifies that we + * are able to checkpoint, then update journals, and then recover updates to both the checkpoint and the journals. + */ + @Test + public void testUpdateThenRecover() throws IOException { + final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(); + + final DummyRecord firstCreate = new DummyRecord("0", UpdateType.CREATE); + repo.update(Collections.singleton(firstCreate), false); + + final List<DummyRecord> creations = new ArrayList<>(); + for (int i = 1; i < 11; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + creations.add(record); + } + repo.update(creations, false); + + final DummyRecord deleteRecord3 = new DummyRecord("3", UpdateType.DELETE); + repo.update(Collections.singleton(deleteRecord3), false); + + final DummyRecord swapOutRecord4 = new DummyRecord("4", UpdateType.SWAP_OUT); + swapOutRecord4.setSwapLocation("swap"); + + final DummyRecord swapOutRecord5 = new DummyRecord("5", UpdateType.SWAP_OUT); + swapOutRecord5.setSwapLocation("swap"); + + final List<DummyRecord> swapOuts = new ArrayList<>(); + swapOuts.add(swapOutRecord4); + swapOuts.add(swapOutRecord5); + repo.update(swapOuts, false); + + final DummyRecord swapInRecord5 = new DummyRecord("5", UpdateType.SWAP_IN); + swapInRecord5.setSwapLocation("swap"); + repo.update(Collections.singleton(swapInRecord5), false); + + final int recordCount = repo.checkpoint(); + assertEquals(9, recordCount); + + final DummyRecord updateRecord6 = new DummyRecord("6", UpdateType.UPDATE); + updateRecord6.setProperties(Collections.singletonMap("greeting", "hello")); + repo.update(Collections.singleton(updateRecord6), false); + + final List<DummyRecord> updateRecords = new ArrayList<>(); + for (int i = 7; i < 11; i++) { + final DummyRecord updateRecord = new DummyRecord(String.valueOf(i), UpdateType.UPDATE); + updateRecord.setProperties(Collections.singletonMap("greeting", "hi")); + updateRecords.add(updateRecord); + } + + final DummyRecord deleteRecord2 = new DummyRecord("2", UpdateType.DELETE); + updateRecords.add(deleteRecord2); + + repo.update(updateRecords, false); + + repo.shutdown(); + + final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo(); + final Collection<DummyRecord> recoveredRecords = recoveryRepo.recoverRecords(); + + // We should now have records: + // 0-10 CREATED + // 2 & 3 deleted + // 4 & 5 swapped out + // 5 swapped back in + // 6 updated with greeting = hello + // 7-10 updated with greeting = hi + + assertEquals(8, recoveredRecords.size()); + final Map<String, DummyRecord> recordMap = recoveredRecords.stream() + .collect(Collectors.toMap(record -> record.getId(), Function.identity())); + + assertFalse(recordMap.containsKey("2")); + assertFalse(recordMap.containsKey("3")); + assertFalse(recordMap.containsKey("4")); + + assertTrue(recordMap.get("1").getProperties().isEmpty()); + assertTrue(recordMap.get("5").getProperties().isEmpty()); + + assertEquals("hello", recordMap.get("6").getProperties().get("greeting")); + + for (int i = 7; i < 11; i++) { + assertEquals("hi", recordMap.get(String.valueOf(i)).getProperties().get("greeting")); + } + + recoveryRepo.shutdown(); + } + + + @Test + @Ignore("For manual performance testing") + public void testUpdatePerformance() throws IOException, InterruptedException { + final Path path = Paths.get("target/sequential-access-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde); + + final WriteAheadRepository<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(path.toFile(), serdeFactory); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final long updateCountPerThread = 1_000_000; + final int numThreads = 4; + + final Thread[] threads = new Thread[numThreads]; + final int batchSize = 1; + + long previousBytes = 0L; + + for (int j = 0; j < 2; j++) { + for (int i = 0; i < numThreads; i++) { + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + final List<DummyRecord> batch = new ArrayList<>(); + for (int i = 0; i < updateCountPerThread / batchSize; i++) { + batch.clear(); + for (int j = 0; j < batchSize; j++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + batch.add(record); + } + + try { + repo.update(batch, false); + } catch (Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); + } + } + } + }); + + threads[i] = t; + } + + final long start = System.nanoTime(); + for (final Thread t : threads) { + t.start(); + } + for (final Thread t : threads) { + t.join(); + } + + long bytes = 0L; + for (final File journalFile : path.resolve("journals").toFile().listFiles()) { + bytes += journalFile.length(); + } + + bytes -= previousBytes; + previousBytes = bytes; + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis; + final String eps = NumberFormat.getInstance().format(eventsPerSecond); + final long bytesPerSecond = bytes * 1000 / millis; + final String bps = NumberFormat.getInstance().format(bytesPerSecond); + + if (j == 0) { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + + " threads, *as a warmup!* " + eps + " events per second, " + bps + " bytes per second"); + } else { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + + " threads, " + eps + " events per second, " + bps + " bytes per second"); + } + } + } + + private void deleteRecursively(final File file) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + + file.delete(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java index bf15ba7..1ae7178 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java @@ -19,12 +19,14 @@ package org.wali; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class DummyRecord { private final String id; private final Map<String, String> props; private final UpdateType updateType; + private String swapLocation; public DummyRecord(final String id, final UpdateType updateType) { this.id = id; @@ -59,8 +61,37 @@ public class DummyRecord { return props.get(name); } + public String getSwapLocation() { + return swapLocation; + } + + public void setSwapLocation(String swapLocation) { + this.swapLocation = swapLocation; + } + @Override public String toString() { return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]"; } + + @Override + public int hashCode() { + return Objects.hash(this.id, this.props, this.updateType, this.swapLocation); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + + if (!(obj instanceof DummyRecord)) { + return false; + } + final DummyRecord other = (DummyRecord) obj; + return Objects.equals(id, other.id) && Objects.equals(props, other.props) && Objects.equals(updateType, other.updateType) && Objects.equals(swapLocation, other.swapLocation); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index e9f3b01..1f6aede 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -27,6 +27,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { private int throwOOMEAfterNserializeEdits = -1; private int serializeEditCount = 0; + @SuppressWarnings("fallthrough") @Override public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { @@ -39,14 +40,28 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { out.writeUTF(record.getUpdateType().name()); out.writeUTF(record.getId()); - if (record.getUpdateType() != UpdateType.DELETE) { - final Map<String, String> props = record.getProperties(); - out.writeInt(props.size()); - for (final Map.Entry<String, String> entry : props.entrySet()) { - out.writeUTF(entry.getKey()); - out.writeUTF(entry.getValue()); + switch (record.getUpdateType()) { + case DELETE: + break; + case SWAP_IN: { + out.writeUTF(record.getSwapLocation()); + // intentionally fall through to CREATE/UPDATE block } + case CREATE: + case UPDATE: { + final Map<String, String> props = record.getProperties(); + out.writeInt(props.size()); + for (final Map.Entry<String, String> entry : props.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + break; + case SWAP_OUT: + out.writeUTF(record.getSwapLocation()); + break; } + } @Override @@ -55,20 +70,36 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { } @Override + @SuppressWarnings("fallthrough") public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { final String updateTypeName = in.readUTF(); final UpdateType updateType = UpdateType.valueOf(updateTypeName); final String id = in.readUTF(); final DummyRecord record = new DummyRecord(id, updateType); - if (record.getUpdateType() != UpdateType.DELETE) { - final int numProps = in.readInt(); - for (int i = 0; i < numProps; i++) { - final String key = in.readUTF(); - final String value = in.readUTF(); - record.setProperty(key, value); + switch (record.getUpdateType()) { + case DELETE: + break; + case SWAP_IN: { + final String swapLocation = in.readUTF(); + record.setSwapLocation(swapLocation); + // intentionally fall through to the CREATE/UPDATE block } + case CREATE: + case UPDATE: + final int numProps = in.readInt(); + for (int i = 0; i < numProps; i++) { + final String key = in.readUTF(); + final String value = in.readUTF(); + record.setProperty(key, value); + } + break; + case SWAP_OUT: + final String swapLocation = in.readUTF(); + record.setSwapLocation(swapLocation); + break; } + return record; } @@ -102,6 +133,6 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { @Override public String getLocation(final DummyRecord record) { - return null; + return record.getSwapLocation(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index ef33f57..20009d1 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -34,6 +34,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.text.NumberFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -139,9 +140,9 @@ public class TestMinimalLockingWriteAheadLog { } @Test - @Ignore("for local testing only") + @Ignore("For manual performance testing") public void testUpdatePerformance() throws IOException, InterruptedException { - final int numPartitions = 4; + final int numPartitions = 16; final Path path = Paths.get("target/minimal-locking-repo"); deleteRecursively(path.toFile()); @@ -152,23 +153,34 @@ public class TestMinimalLockingWriteAheadLog { final Collection<DummyRecord> initialRecs = repo.recoverRecords(); assertTrue(initialRecs.isEmpty()); - final int updateCountPerThread = 1_000_000; - final int numThreads = 16; + final long updateCountPerThread = 1_000_000; + final int numThreads = 4; final Thread[] threads = new Thread[numThreads]; + final int batchSize = 1; + + long previousBytes = 0; + for (int j = 0; j < 2; j++) { for (int i = 0; i < numThreads; i++) { final Thread t = new Thread(new Runnable() { @Override public void run() { - for (int i = 0; i < updateCountPerThread; i++) { - final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + final List<DummyRecord> batch = new ArrayList<>(); + + for (int i = 0; i < updateCountPerThread / batchSize; i++) { + batch.clear(); + for (int j = 0; j < batchSize; j++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + batch.add(record); + } + try { - repo.update(Collections.singleton(record), false); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.toString()); + repo.update(batch, false); + } catch (Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); } } } @@ -185,11 +197,30 @@ public class TestMinimalLockingWriteAheadLog { t.join(); } + long bytes = 0L; + for (final File file : path.toFile().listFiles()) { + if (file.getName().startsWith("partition-")) { + for (final File journalFile : file.listFiles()) { + bytes += journalFile.length(); + } + } + } + + bytes -= previousBytes; + previousBytes = bytes; + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis; + final String eps = NumberFormat.getInstance().format(eventsPerSecond); + final long bytesPerSecond = bytes * 1000 / millis; + final String bps = NumberFormat.getInstance().format(bytesPerSecond); + if (j == 0) { - System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads, *as a warmup!*"); + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, *as a warmup!* " + + eps + " events per second, " + bps + " bytes per second"); } else { - System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads"); + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, " + + eps + " events per second, " + bps + " bytes per second"); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 00dde06..3901029 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -16,10 +16,10 @@ */ package org.apache.nifi.controller.repository; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -47,6 +48,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.wali.SequentialAccessWriteAheadLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wali.MinimalLockingWriteAheadLog; @@ -86,7 +88,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private volatile ScheduledFuture<?> checkpointFuture; private final long checkpointDelayMillis; - private final SortedSet<Path> flowFileRepositoryPaths = new TreeSet<>(); + private final File flowFileRepositoryPath; + private final List<File> recoveryFiles = new ArrayList<>(); private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; @@ -126,16 +129,23 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis checkpointDelayMillis = 0l; numPartitions = 0; checkpointExecutor = null; + flowFileRepositoryPath = null; } public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false")); // determine the database file path and ensure it exists + final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX); + flowFileRepositoryPath = new File(directoryName); + + // We used to use the MinimalLockingWriteAheadLog, but we now use the SequentialAccessWriteAheadLog. Since the + // MinimalLockingWriteAheadLog supports multiple partitions, we need to ensure that we recover records from all + // partitions, so we build up a List of Files for the recovery files. for (final String propertyName : nifiProperties.getPropertyKeys()) { if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) { - final String directoryName = nifiProperties.getProperty(propertyName); - flowFileRepositoryPaths.add(Paths.get(directoryName)); + final String dirName = nifiProperties.getProperty(propertyName); + recoveryFiles.add(new File(dirName)); } } @@ -149,16 +159,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis public void initialize(final ResourceClaimManager claimManager) throws IOException { this.claimManager = claimManager; - for (final Path path : flowFileRepositoryPaths) { - Files.createDirectories(path); - } + Files.createDirectories(flowFileRepositoryPath.toPath()); // TODO: Should ensure that only 1 instance running and pointing at a particular path // TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on // backup and then the data deleted from the normal location; then can move backup to normal location and // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory serdeFactory = new RepositoryRecordSerdeFactory(claimManager); - wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPaths, numPartitions, serdeFactory, this); + wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPath, serdeFactory, this); logger.info("Initialized FlowFile Repository using {} partitions", numPartitions); } @@ -179,22 +187,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public long getStorageCapacity() throws IOException { - long capacity = 0L; - for (final Path path : flowFileRepositoryPaths) { - capacity += Files.getFileStore(path).getTotalSpace(); - } - - return capacity; + return Files.getFileStore(flowFileRepositoryPath.toPath()).getTotalSpace(); } @Override public long getUsableStorageSpace() throws IOException { - long usableSpace = 0L; - for (final Path path : flowFileRepositoryPaths) { - usableSpace += Files.getFileStore(path).getUsableSpace(); - } - - return usableSpace; + return Files.getFileStore(flowFileRepositoryPath.toPath()).getUsableSpace(); } @Override @@ -371,6 +369,72 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); } + + @SuppressWarnings("deprecation") + private Optional<Collection<RepositoryRecord>> recoverFromOldWriteAheadLog() throws IOException { + final List<File> partitionDirs = new ArrayList<>(); + for (final File recoveryFile : recoveryFiles) { + final File[] partitions = recoveryFile.listFiles(file -> file.getName().startsWith("partition-")); + for (final File partition : partitions) { + partitionDirs.add(partition); + } + } + + if (partitionDirs == null || partitionDirs.isEmpty()) { + return Optional.empty(); + } + + logger.info("Encountered FlowFile Repository that was written using an old version of the Write-Ahead Log. " + + "Will recover from this version and re-write the repository using the new version of the Write-Ahead Log."); + + final SortedSet<Path> paths = recoveryFiles.stream() + .map(File::toPath) + .collect(Collectors.toCollection(TreeSet::new)); + + final Collection<RepositoryRecord> recordList; + final MinimalLockingWriteAheadLog<RepositoryRecord> minimalLockingWal = new MinimalLockingWriteAheadLog<>(paths, partitionDirs.size(), serdeFactory, null); + try { + recordList = minimalLockingWal.recoverRecords(); + } finally { + minimalLockingWal.shutdown(); + } + + wal.update(recordList, true); + + // Delete the old repository + logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new implementation. Will now delete old files."); + for (final File partitionDir : partitionDirs) { + final File[] children = partitionDir.listFiles(); + + if (children != null) { + for (final File child : children) { + final boolean deleted = child.delete(); + if (!deleted) { + logger.warn("Failed to delete old file {}; this file should be cleaned up manually", child); + } + } + } + + if (!partitionDir.delete()) { + logger.warn("Failed to delete old directory {}; this directory should be cleaned up manually", partitionDir); + } + } + + for (final File recoveryFile : recoveryFiles) { + final File snapshotFile = new File(recoveryFile, "snapshot"); + if (!snapshotFile.delete() && snapshotFile.exists()) { + logger.warn("Failed to delete old file {}; this file should be cleaned up manually", snapshotFile); + } + + final File partialFile = new File(recoveryFile, "snapshot.partial"); + if (!partialFile.delete() && partialFile.exists()) { + logger.warn("Failed to delete old file {}; this file should be cleaned up manually", partialFile); + } + } + + return Optional.of(recordList); + } + @Override public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException { final Map<String, FlowFileQueue> queueMap = new HashMap<>(); @@ -378,7 +442,15 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis queueMap.put(queue.getIdentifier(), queue); } serdeFactory.setQueueMap(queueMap); - final Collection<RepositoryRecord> recordList = wal.recoverRecords(); + + // Since we used to use the MinimalLockingWriteAheadRepository, we need to ensure that if the FlowFile + // Repo was written using that impl, that we properly recover from the implementation. + Collection<RepositoryRecord> recordList = wal.recoverRecords(); + + if (recordList == null || recordList.isEmpty()) { + recordList = recoverFromOldWriteAheadLog().orElse(new ArrayList<>()); + } + serdeFactory.setQueueMap(null); for (final RepositoryRecord record : recordList) {