[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r173080583 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +static class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +static class LedgerStorageFlushTask implements Callable { +LedgerStorage ledgerStorage; + +LedgerStorageFlushTask(LedgerStorage ledgerStorage) { +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.flush(); +} catch (IOException e) { +LOG.error("Got Exception for flush call", e); +throw new IOException("Got Exception for Flush call", e); +} +return true; +} +} + +static class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId); +ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, entryId); +if (!expectedByteBuf.equals(actualByteBuf)) { +LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()), +actualByteBuf.toString(Charset.defaultCharset())); +throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset()) ++ " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset())); +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); +Random rand = new Random(); + +int numOfLedgers = 70; +int numEntries = 2000; +// Create ledgers +for (int i = 0; i < numOfLedgers; i++) { +ledgerStorage.setMasterKey(i, "key".getBytes()); +} + +ExecutorService executor = Executors.newFixedThreadPool(10); +List> writeAndFlushTasks = new ArrayList>(); +for (int j = 0; j < numEntries; j++) { +for (int i = 0; i < numOfLedgers; i++) { +writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, ledgerStorage)); +} +} + +/* + * add some flush tasks to the list of writetasks list. + */ +for (int i = 0; i < (numOfLedgers * numEntri
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172483645 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +static class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +static class LedgerStorageFlushTask implements Callable { +LedgerStorage ledgerStorage; + +LedgerStorageFlushTask(LedgerStorage ledgerStorage) { +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.flush(); +} catch (IOException e) { +LOG.error("Got Exception for flush call", e); +throw new IOException("Got Exception for Flush call", e); +} +return true; +} +} + +static class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId); +ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, entryId); +if (!expectedByteBuf.equals(actualByteBuf)) { +LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()), +actualByteBuf.toString(Charset.defaultCharset())); +throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset()) ++ " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset())); +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); +Random rand = new Random(); + +int numOfLedgers = 70; +int numEntries = 2000; +// Create ledgers +for (int i = 0; i < numOfLedgers; i++) { +ledgerStorage.setMasterKey(i, "key".getBytes()); +} + +ExecutorService executor = Executors.newFixedThreadPool(10); +List> writeAndFlushTasks = new ArrayList>(); +for (int j = 0; j < numEntries; j++) { +for (int i = 0; i < numOfLedgers; i++) { +writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, ledgerStorage)); +} +} + +/* + * add some flush tasks to the list of writetasks list. + */ +for (int i = 0; i < (numOfLedgers * numEntri
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172483093 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +static class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +static class LedgerStorageFlushTask implements Callable { +LedgerStorage ledgerStorage; + +LedgerStorageFlushTask(LedgerStorage ledgerStorage) { +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.flush(); +} catch (IOException e) { +LOG.error("Got Exception for flush call", e); +throw new IOException("Got Exception for Flush call", e); +} +return true; +} +} + +static class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId); +ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, entryId); +if (!expectedByteBuf.equals(actualByteBuf)) { +LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()), +actualByteBuf.toString(Charset.defaultCharset())); +throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset()) ++ " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset())); +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); +Random rand = new Random(); + +int numOfLedgers = 70; +int numEntries = 2000; +// Create ledgers +for (int i = 0; i < numOfLedgers; i++) { +ledgerStorage.setMasterKey(i, "key".getBytes()); +} + +ExecutorService executor = Executors.newFixedThreadPool(10); +List> writeAndFlushTasks = new ArrayList>(); +for (int j = 0; j < numEntries; j++) { +for (int i = 0; i < numOfLedgers; i++) { +writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, ledgerStorage)); +} +} + +/* + * add some flush tasks to the list of writetasks list. + */ +for (int i = 0; i < (numOfLedgers * numEntri
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172481464 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +static class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +static class LedgerStorageFlushTask implements Callable { +LedgerStorage ledgerStorage; + +LedgerStorageFlushTask(LedgerStorage ledgerStorage) { +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ledgerStorage.flush(); +} catch (IOException e) { +LOG.error("Got Exception for flush call", e); +throw new IOException("Got Exception for Flush call", e); +} +return true; +} +} + +static class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() throws IOException { +try { +ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId); +ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, entryId); +if (!expectedByteBuf.equals(actualByteBuf)) { +LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()), +actualByteBuf.toString(Charset.defaultCharset())); +throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset()) ++ " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset())); +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, +e); +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); +Random rand = new Random(); Review comment: for better coverage it is better to have indeterminism, especially here we are trying to make sure there are no race conditions and atomic/transaction issues. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172354228 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { Review comment: first thing 1) calling .get() on future is inevitable, since thats the only way you would get the result of the future. > Instead of returning false on errors, you could throw Exception, and have that carry the context of the error (ledger id, entry id etc). This would simplify the loops in the test itself. 2) even if I change the logic to throw exception instead of boolean in the case of failure, we are not getting rid of loops. Because incase of CancellationException we would get generic cancellation exception instead of any of our exception with customized message. So atleast for this case we need to do what we are doing. Again as I said this is matter of preference how we deal with return status of tasks/futures in junit test env. If you see it as a dealbreaker for you, I'll go ahead and change it to throw exception, as I don't see value in dragging this further. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172351072 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +static class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +static class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +String expectedValue = generateDataString(ledgerId, entryId); +ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId); +long actualLedgerId = byteBuf.readLong(); +long actualEntryId = byteBuf.readLong(); +byte[] data = new byte[byteBuf.readableBytes()]; +byteBuf.readBytes(data); +if (ledgerId != actualLedgerId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId, +actualLedgerId); +return false; +} +if (entryId != actualEntryId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId, +actualEntryId); +return false; +} +if (!expectedValue.equals(new String(data))) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId, +new String(data)); +return false; +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { Review comment: > @reddycharan it would be good to have a thread running in a tight loop calling flush() on the ledgerstorage, while the adds and reads are happening. fair enough I can add flush tasks to the write and read tasks list, so that flush would be executed along with write/read operations. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172283513 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { Review comment: I don't see why using .get would be messy. Just FYI about Junit, "The JUnit framework captures only assertion errors in the main thread running the test. It is not aware of exceptions from within new spawn threads. In order to do it right, you should communicate the thread's termination state to the main thread." So to get the status/result of task executed in non-main thread we should call for .get of future. Agreed, instead of returning boolean we may throw exception with proper message, but for diagnosing / logging purpose I feel it is better to fail with Assert statements and enough logging in the tasks. I see it is just a matter of preference and there is no much difference in choosing one over the other. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172101884 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,135 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +static class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +static class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +String expectedValue = generateDataString(ledgerId, entryId); +ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId); +long actualLedgerId = byteBuf.readLong(); +long actualEntryId = byteBuf.readLong(); +byte[] data = new byte[byteBuf.readableBytes()]; +byteBuf.readBytes(data); +if (ledgerId != actualLedgerId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId, +actualLedgerId); +return false; +} +if (entryId != actualEntryId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId, +actualEntryId); +return false; +} +if (!expectedValue.equals(new String(data))) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId, +new String(data)); +return false; +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); + +int numOfLedgers = 70; +int numEntries = 2000; +// Create ledgers +for (int i = 0; i < numOfLedgers; i++) { +ledgerStorage.setMasterKey(i, "key".getBytes()); +} + +ExecutorService executor = Executors.newFixedThreadPool(10); Review comment: fixing that This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172101284 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +String expectedValue = generateDataString(ledgerId, entryId); +ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId); +long actualLedgerId = byteBuf.readLong(); +long actualEntryId = byteBuf.readLong(); +byte[] data = new byte[byteBuf.readableBytes()]; +byteBuf.readBytes(data); +if (ledgerId != actualLedgerId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId, +actualLedgerId); +return false; +} +if (entryId != actualEntryId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId, +actualEntryId); +return false; +} +if (!expectedValue.equals(new String(data))) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId, +new String(data)); +return false; +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); Review comment: Bookie doesn't implements AutoCloseable, Closeable to use with try-with-resources. But again, I'm just following Bookie instances is used in other testcases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962703 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java ## @@ -74,7 +76,7 @@ GarbageCollectorThread gcThread; // this indicates that a write has happened since the last flush -private volatile boolean somethingWritten = false; +private AtomicBoolean somethingWritten = new AtomicBoolean(false); Review comment: changing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962627 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -107,14 +112,18 @@ public void testCorruptEntryLog() throws Exception { } private ByteBuf generateEntry(long ledger, long entry) { -byte[] data = ("ledger-" + ledger + "-" + entry).getBytes(); +byte[] data = generateDataString(ledger, entry).getBytes(); ByteBuf bb = Unpooled.buffer(8 + 8 + data.length); bb.writeLong(ledger); bb.writeLong(entry); bb.writeBytes(data); return bb; } +private String generateDataString(long ledger, long entry) { +return new String("ledger-" + ledger + "-" + entry); Review comment: changing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962644 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -107,14 +112,18 @@ public void testCorruptEntryLog() throws Exception { } private ByteBuf generateEntry(long ledger, long entry) { -byte[] data = ("ledger-" + ledger + "-" + entry).getBytes(); +byte[] data = generateDataString(ledger, entry).getBytes(); ByteBuf bb = Unpooled.buffer(8 + 8 + data.length); bb.writeLong(ledger); bb.writeLong(entry); bb.writeBytes(data); return bb; } +private String generateDataString(long ledger, long entry) { Review comment: changing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962565 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962530 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171960386 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java ## @@ -360,10 +362,10 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { @Override public synchronized void flush() throws IOException { -if (!somethingWritten) { +if (!somethingWritten.get()) { Review comment: changing it to compareAndSet. but yeah I considered removing synchronized for flush method, but it is not clear regarding the transactional guarantees of the underlying methods (methods which get called by ILS.flush). So I didn't change this, moreover it is called by requestFlush, which is called by SyncThread.shutdown and SyncThread.start, which are one time events and not related to this work item. So left unchanged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171939731 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { Review comment: other than removing redundant .isDone check, I would keep the rest of the test logic as it is now. For diagnosing in the case of failure, it is helpful to have assert failures and proper error log lines (by catching the error and log it) rather than failing with generic cancellation exception or executionexception when we call future.get(). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171937543 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +String expectedValue = generateDataString(ledgerId, entryId); +ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId); +long actualLedgerId = byteBuf.readLong(); +long actualEntryId = byteBuf.readLong(); +byte[] data = new byte[byteBuf.readableBytes()]; +byteBuf.readBytes(data); +if (ledgerId != actualLedgerId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId, +actualLedgerId); +return false; +} +if (entryId != actualEntryId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId, +actualEntryId); +return false; +} +if (!expectedValue.equals(new String(data))) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId, +new String(data)); +return false; +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); + +int numOfLedgers = 70; +int numEntries = 3000; +// Create ledgers +for (int i = 0; i < numOfLedgers; i++) { +ledgerStorage.setMasterKey(i, "key".getBytes()); +} + +ExecutorService executor = Executors.newFixedThreadPool(40); Review comment: i just ran it on my macboop pro. I was just trying some big number. But yeah 40 threads is bit much I'll bring it down to 10-15. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171933046 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +String expectedValue = generateDataString(ledgerId, entryId); +ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId); +long actualLedgerId = byteBuf.readLong(); +long actualEntryId = byteBuf.readLong(); +byte[] data = new byte[byteBuf.readableBytes()]; +byteBuf.readBytes(data); +if (ledgerId != actualLedgerId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId, +actualLedgerId); +return false; +} +if (entryId != actualEntryId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId, +actualEntryId); +return false; +} +if (!expectedValue.equals(new String(data))) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId, +new String(data)); +return false; +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); Review comment: probably not, but it is easier to get handle of fully constructed ledgerstorage instead of we creating instance of ledgerstorage and calling initialize explicitly with right set of arguments. Just followed how things are handled in other tests in this testsuite. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171930561 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +String expectedValue = generateDataString(ledgerId, entryId); +ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId); +long actualLedgerId = byteBuf.readLong(); +long actualEntryId = byteBuf.readLong(); +byte[] data = new byte[byteBuf.readableBytes()]; +byteBuf.readBytes(data); +if (ledgerId != actualLedgerId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId, +actualLedgerId); +return false; +} +if (entryId != actualEntryId) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId, +actualEntryId); +return false; +} +if (!expectedValue.equals(new String(data))) { +LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId, +new String(data)); +return false; +} +} catch (IOException e) { +LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +/** + * test concurrent write operations and then concurrent read + * operations using InterleavedLedgerStorage. + */ +@Test +public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception { +File ledgerDir = createTempDir("bkTest", ".dir"); +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); +conf.setJournalDirName(ledgerDir.toString()); +conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()}); +conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); +Bookie bookie = new Bookie(conf); +InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); + +int numOfLedgers = 70; +int numEntries = 3000; +// Create ledgers +for (int i = 0; i < numOfLedgers; i++) { +ledgerStorage.setMasterKey(i, "key".getBytes()); +} + +ExecutorService executor = Executors.newFixedThreadPool(40); +List writeTasks = new ArrayList(); +for (int j = 0; j < numEntries; j++) { +for (int i = 0; i < numOfLedgers; i++) { +writeTasks.add(new LedgerStorageWriteTask(i, j, ledgerStorage)); +} +} + +// invoke all those write tasks all at once concurrently and set timeout +// 6 seconds for them to complete +List> writeTasksFutures = executor.invokeAll(writeTasks, 6, TimeUnit.SECONDS); +for (int i = 0; i < writeTasks.size(); i++) { +Future future = writeTasksFutures.get(i); +LedgerStorageWriteTask writeTask = writeTasks.get(i); +long ledgerId = writeTask.ledgerId; +int entryId = writeTask.entryId; +Assert.assertTrue( +"WriteTask should have b
[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171929602 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ## @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception { assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet()); } + +class LedgerStorageWriteTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { +try { +ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); +} catch (IOException e) { +LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e); +return false; +} +return true; +} +} + +class LedgerStorageReadTask implements Callable { +long ledgerId; +int entryId; +LedgerStorage ledgerStorage; + +LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.ledgerStorage = ledgerStorage; +} + +@Override +public Boolean call() { Review comment: invokeAll returns list of Futures and join is method of CompletableFuture but not Future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services