[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-07 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r173080583
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List> writeAndFlushTasks = new 
ArrayList>();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < (numOfLedgers * numEntri

[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172483645
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List> writeAndFlushTasks = new 
ArrayList>();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < (numOfLedgers * numEntri

[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172483093
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List> writeAndFlushTasks = new 
ArrayList>();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < (numOfLedgers * numEntri

[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172481464
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
 
 Review comment:
   for better coverage it is better to have indeterminism, especially here we 
are trying to make sure there are no race conditions and atomic/transaction 
issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172354228
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
 
 Review comment:
   first thing 
   
   1) calling .get() on future is inevitable, since thats the only way you 
would get the result of the future.
   
   > Instead of returning false on errors, you could throw Exception, and have 
that carry the context of the error (ledger id, entry id etc). This would 
simplify the loops in the test itself.
   
   2) even if I change the logic to throw exception instead of boolean in the 
case of failure, we are not getting rid of loops. Because incase of 
CancellationException we would get generic cancellation exception instead of 
any of our exception with customized message. So atleast for this case we need 
to do what we are doing.
   
   Again as I said this is matter of preference how we deal with return status 
of tasks/futures in junit test env. If you see it as a dealbreaker for you, 
I'll go ahead and change it to throw exception, as I don't see value in 
dragging this further.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172351072
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
 
 Review comment:
   > @reddycharan it would be good to have a thread running in a tight loop 
calling flush() on the ledgerstorage, while the adds and reads are happening.
   
   fair enough I can add flush tasks  to the write and read tasks list, so that 
flush would be executed along with write/read operations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172283513
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
 
 Review comment:
   I don't see why using .get would be messy. Just FYI about Junit, "The JUnit 
framework captures only assertion errors in the main thread running the test. 
It is not aware of exceptions from within new spawn threads. In order to do it 
right, you should communicate the thread's termination state to the main 
thread." So to get the status/result of task executed in non-main thread we 
should call for .get of future. Agreed, instead of returning boolean we may 
throw exception with proper message, but for diagnosing / logging purpose I 
feel it is better to fail with Assert statements  and enough logging in the 
tasks. I see it is just a matter of preference and there is no much difference 
in choosing one over the other.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-04 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172101884
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,135 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
 
 Review comment:
   fixing that


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-04 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172101284
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
 
 Review comment:
   Bookie doesn't implements AutoCloseable, Closeable to use with 
try-with-resources. But again, I'm just following Bookie instances is used in 
other testcases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962703
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 ##
 @@ -74,7 +76,7 @@
 GarbageCollectorThread gcThread;
 
 // this indicates that a write has happened since the last flush
-private volatile boolean somethingWritten = false;
+private AtomicBoolean somethingWritten = new AtomicBoolean(false);
 
 Review comment:
   changing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962627
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -107,14 +112,18 @@ public void testCorruptEntryLog() throws Exception {
 }
 
 private ByteBuf generateEntry(long ledger, long entry) {
-byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+byte[] data = generateDataString(ledger, entry).getBytes();
 ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
 bb.writeLong(ledger);
 bb.writeLong(entry);
 bb.writeBytes(data);
 return bb;
 }
 
+private String generateDataString(long ledger, long entry) {
+return new String("ledger-" + ledger + "-" + entry);
 
 Review comment:
   changing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962644
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -107,14 +112,18 @@ public void testCorruptEntryLog() throws Exception {
 }
 
 private ByteBuf generateEntry(long ledger, long entry) {
-byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+byte[] data = generateDataString(ledger, entry).getBytes();
 ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
 bb.writeLong(ledger);
 bb.writeLong(entry);
 bb.writeBytes(data);
 return bb;
 }
 
+private String generateDataString(long ledger, long entry) {
 
 Review comment:
   changing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962565
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171962530
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171960386
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 ##
 @@ -360,10 +362,10 @@ public void checkpoint(Checkpoint checkpoint) throws 
IOException {
 
 @Override
 public synchronized void flush() throws IOException {
-if (!somethingWritten) {
+if (!somethingWritten.get()) {
 
 Review comment:
   changing it to compareAndSet.
   
   but yeah I considered removing synchronized for flush method, but it is not 
clear regarding the transactional guarantees of the underlying methods (methods 
which get called by ILS.flush). So I didn't change this, moreover it is called 
by requestFlush, which is called by SyncThread.shutdown and SyncThread.start, 
which are one time events and not related to this work item. So left unchanged. 
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171939731
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
 
 Review comment:
   other than removing redundant .isDone check, I would keep the rest of the 
test logic as it is now. For diagnosing in the case of failure, it is helpful 
to have assert failures and proper error log lines (by catching the error and 
log it) rather than failing with generic cancellation exception or 
executionexception when we call future.get(). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171937543
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+
+int numOfLedgers = 70;
+int numEntries = 3000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(40);
 
 Review comment:
   i just ran it on my macboop pro. I was just trying some big number. But yeah 
40 threads is bit much I'll bring it down to 10-15. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171933046
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
 
 Review comment:
   probably not, but it is easier to get handle of fully constructed 
ledgerstorage instead of we creating instance of ledgerstorage and calling 
initialize explicitly with right set of arguments. Just followed how things are 
handled in other tests in this testsuite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171930561
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+
+int numOfLedgers = 70;
+int numEntries = 3000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(40);
+List writeTasks = new 
ArrayList();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+// invoke all those write tasks all at once concurrently and set 
timeout
+// 6 seconds for them to complete
+List> writeTasksFutures = 
executor.invokeAll(writeTasks, 6, TimeUnit.SECONDS);
+for (int i = 0; i < writeTasks.size(); i++) {
+Future future = writeTasksFutures.get(i);
+LedgerStorageWriteTask writeTask = writeTasks.get(i);
+long ledgerId = writeTask.ledgerId;
+int entryId = writeTask.entryId;
+Assert.assertTrue(
+"WriteTask should have b

[GitHub] reddycharan commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-02 Thread GitBox
reddycharan commented on a change in pull request #1225: Issue #570: getting 
rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r171929602
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
 
 Review comment:
   invokeAll returns list of Futures and join is method of CompletableFuture 
but not Future. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services